package main import ( "context" "fmt" red "github.com/redis/go-redis/v9" "time" ) func main() { client := red.NewClient(&red.Options{ Addr: "127.0.0.1:6379", // Redis地址 Password: "", // 密码(如果有的话) DB: 0, // 使用默认DB }) defer client.Close() ctx := context.Background() for { streams, err := client.XReadGroup(ctx, &red.XReadGroupArgs{ Group: "group1", // 消费者组的名称 Consumer: "consumer1", // 消费者的名称 Streams: []string{"test_stream", ">"}, // Stream的名称和ID Count: int64(1), // 要读取的消息数量 Block: time.Second * 5, // 阻塞时间,0表示不阻塞 }).Result() if err != nil { fmt.Println("XReadGroup error:", err) continue } for _, stream := range streams { streamName := stream.Stream fmt.Println("获取消息 长度", len(stream.Messages)) for _, message := range stream.Messages { messageID := message.ID messageValues := message.Values fmt.Println("读取到Stream: streamName=", streamName, " messageID=", messageID, " messageValues=", messageValues) // 标记消息已经被消费者读取 err := client.XAck(ctx, "test_stream", "group1", messageID).Err() if err != nil { fmt.Println("XAck error:", err) return } fmt.Println("消息已经被标记为已读取") } } } }