52 lines
1.4 KiB
Go
52 lines
1.4 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
red "github.com/redis/go-redis/v9"
|
||
"time"
|
||
)
|
||
|
||
func main() {
|
||
client := red.NewClient(&red.Options{
|
||
Addr: "127.0.0.1:6379", // Redis地址
|
||
Password: "", // 密码(如果有的话)
|
||
DB: 0, // 使用默认DB
|
||
})
|
||
defer client.Close()
|
||
|
||
ctx := context.Background()
|
||
for {
|
||
streams, err := client.XReadGroup(ctx, &red.XReadGroupArgs{
|
||
Group: "group2", // 消费者组的名称
|
||
Consumer: "consumer1", // 消费者的名称
|
||
Streams: []string{"test_stream", ">"}, // Stream的名称和ID
|
||
Count: int64(10), // 要读取的消息数量
|
||
Block: time.Second * 5, // 阻塞时间,0表示不阻塞
|
||
}).Result()
|
||
|
||
if err != nil {
|
||
fmt.Println("XReadGroup error:", err)
|
||
continue
|
||
}
|
||
|
||
for _, stream := range streams {
|
||
streamName := stream.Stream
|
||
fmt.Println("获取消息 长度", len(stream.Messages))
|
||
for _, message := range stream.Messages {
|
||
messageID := message.ID
|
||
messageValues := message.Values
|
||
fmt.Println("读取到Stream: streamName=", streamName, " messageID=", messageID, " messageValues=", messageValues)
|
||
|
||
// 标记消息已经被消费者读取
|
||
err := client.XAck(ctx, "test_stream", "group2", messageID).Err()
|
||
if err != nil {
|
||
fmt.Println("XAck error:", err)
|
||
return
|
||
}
|
||
fmt.Println("消息已经被标记为已读取")
|
||
}
|
||
}
|
||
}
|
||
}
|