1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package main
import ( "fmt" "sync"
"github.com/IBM/sarama" )
var ( wg sync.WaitGroup )
func main() { consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { panic(err) } defer consumer.Close() partitionlist, err := consumer.Partitions("test") if err != nil { panic(err) } for partition := range partitionlist { pt, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest) if err != nil { panic(err) } defer pt.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() for msg := range pt.Messages() { fmt.Printf("%s partition: %d, offset:%d,key:%s,value:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pt) } wg.Wait() }
|