service/app/firenze/consumer1/main.go

52 lines
1.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
red "github.com/redis/go-redis/v9"
"time"
)
func main() {
client := red.NewClient(&red.Options{
Addr: "127.0.0.1:6379", // Redis地址
Password: "", // 密码(如果有的话)
DB: 0, // 使用默认DB
})
defer client.Close()
ctx := context.Background()
for {
streams, err := client.XReadGroup(ctx, &red.XReadGroupArgs{
Group: "group1", // 消费者组的名称
Consumer: "consumer1", // 消费者的名称
Streams: []string{"test_stream", ">"}, // Stream的名称和ID
Count: int64(1), // 要读取的消息数量
Block: time.Second * 5, // 阻塞时间0表示不阻塞
}).Result()
if err != nil {
fmt.Println("XReadGroup error:", err)
continue
}
for _, stream := range streams {
streamName := stream.Stream
fmt.Println("获取消息 长度", len(stream.Messages))
for _, message := range stream.Messages {
messageID := message.ID
messageValues := message.Values
fmt.Println("读取到Stream: streamName=", streamName, " messageID=", messageID, " messageValues=", messageValues)
// 标记消息已经被消费者读取
err := client.XAck(ctx, "test_stream", "group1", messageID).Err()
if err != nil {
fmt.Println("XAck error:", err)
return
}
fmt.Println("消息已经被标记为已读取")
}
}
}
}