golang操作rocketmq的示例代码
下载
go get github.com/apache/rocketmq-client-go/v2
代码
func main() { // 1. 创建主题 //CreateTopic("test-04", 10909) // 2. 生产者向主题中发送消息 //SendSyncMessage("hello world0002") // 3. 消费者订阅主题并消费 SubscribeMessage() } func CreateTopic(topicName string, port int) { // 创建主题 testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"ip:server_port"}))) if err != nil { fmt.Println(err) } err = testAdmin.CreateTopic( context.Background(), admin.WithTopicCreate(topicName), admin.WithBrokerAddrCreate(fmt.Sprintf("ip:%d", port)), ) fmt.Println(err) func SendSyncMessage(message string) { endPoint := []string{"ip:server_port"} p, err := rocketmq.NewProducer( producer.WithNameServer(endPoint), //producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)), producer.WithRetry(2), err = p.Start() result, err := p.SendSync(context.Background(), &primitive.Message{ Topic: "test", Body: []byte(message), }) fmt.Println(result.Status, result) func SubscribeMessage() { // 订阅主题、并消费 c, err := rocketmq.NewPushConsumer( consumer.WithNameServer(endPoint), consumer.WithConsumerModel(consumer.Clustering), consumer.WithGroupName("GID_TEST01"), //fmt.Println(err) err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil //fmt.Println(err.Error()) // Note: start after subscribe err = c.Start() os.Exit(-1) c.Shutdown()
到此这篇关于golang操作rocketmq的示例代码的文章就介绍到这了,更多相关golang操作rocketmq内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
赞 (0)