52 lines
1.4 KiB
Go
52 lines
1.4 KiB
Go
|
package main
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"fmt"
|
|||
|
red "github.com/redis/go-redis/v9"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
func main() {
|
|||
|
client := red.NewClient(&red.Options{
|
|||
|
Addr: "127.0.0.1:6379", // Redis地址
|
|||
|
Password: "", // 密码(如果有的话)
|
|||
|
DB: 0, // 使用默认DB
|
|||
|
})
|
|||
|
defer client.Close()
|
|||
|
|
|||
|
ctx := context.Background()
|
|||
|
for {
|
|||
|
streams, err := client.XReadGroup(ctx, &red.XReadGroupArgs{
|
|||
|
Group: "group1", // 消费者组的名称
|
|||
|
Consumer: "consumer1", // 消费者的名称
|
|||
|
Streams: []string{"test_stream", ">"}, // Stream的名称和ID
|
|||
|
Count: int64(1), // 要读取的消息数量
|
|||
|
Block: time.Second * 5, // 阻塞时间,0表示不阻塞
|
|||
|
}).Result()
|
|||
|
|
|||
|
if err != nil {
|
|||
|
fmt.Println("XReadGroup error:", err)
|
|||
|
continue
|
|||
|
}
|
|||
|
|
|||
|
for _, stream := range streams {
|
|||
|
streamName := stream.Stream
|
|||
|
fmt.Println("获取消息 长度", len(stream.Messages))
|
|||
|
for _, message := range stream.Messages {
|
|||
|
messageID := message.ID
|
|||
|
messageValues := message.Values
|
|||
|
fmt.Println("读取到Stream: streamName=", streamName, " messageID=", messageID, " messageValues=", messageValues)
|
|||
|
|
|||
|
// 标记消息已经被消费者读取
|
|||
|
err := client.XAck(ctx, "test_stream", "group1", messageID).Err()
|
|||
|
if err != nil {
|
|||
|
fmt.Println("XAck error:", err)
|
|||
|
return
|
|||
|
}
|
|||
|
fmt.Println("消息已经被标记为已读取")
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|