service/app/firenze/consumer1/main.go

52 lines
1.4 KiB
Go
Raw Permalink Normal View History

2024-10-12 16:28:53 +08:00
package main
import (
"context"
"fmt"
red "github.com/redis/go-redis/v9"
"time"
)
func main() {
client := red.NewClient(&red.Options{
Addr: "127.0.0.1:6379", // Redis地址
Password: "", // 密码(如果有的话)
DB: 0, // 使用默认DB
})
defer client.Close()
ctx := context.Background()
for {
streams, err := client.XReadGroup(ctx, &red.XReadGroupArgs{
Group: "group1", // 消费者组的名称
Consumer: "consumer1", // 消费者的名称
Streams: []string{"test_stream", ">"}, // Stream的名称和ID
Count: int64(1), // 要读取的消息数量
Block: time.Second * 5, // 阻塞时间0表示不阻塞
}).Result()
if err != nil {
fmt.Println("XReadGroup error:", err)
continue
}
for _, stream := range streams {
streamName := stream.Stream
fmt.Println("获取消息 长度", len(stream.Messages))
for _, message := range stream.Messages {
messageID := message.ID
messageValues := message.Values
fmt.Println("读取到Stream: streamName=", streamName, " messageID=", messageID, " messageValues=", messageValues)
// 标记消息已经被消费者读取
err := client.XAck(ctx, "test_stream", "group1", messageID).Err()
if err != nil {
fmt.Println("XAck error:", err)
return
}
fmt.Println("消息已经被标记为已读取")
}
}
}
}