Kafka Go Client
本次实验假定您已经安装好了kafka(单机或者集群),且配置好了远程访问地址,详见配置文件config/server.properties
首先需要下载安装librdkafka
wget https://github.com/edenhill/librdkafka/archive/v1.4.0.tar.gz
tar zxvf librdkafka-1.4.0.tar.gz
cd librdkafka-1.4.0
./configure
make
make install
安装完毕,可以开始写go client
在go项目下安装客户端
go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
consumer示例
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "192.168.37.133:9092",
"group.id": "1",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"test", "^aRegex.*[Tt]est"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
producer示例
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "192.168.37.133:9092"})
if err != nil{
panic(err)
}
defer p.Close()
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
}else{
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
topic := "test"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// 等待消息发送
p.Flush(15 * 1000)
}