本文实现go调用sarama库,实现创建Kafka topic,添加某账号的生产者权限以及消费者权限
1、创建topic:
import ( "github.com/Shopify/sarama" "log" ) func Create(topic *TopicInfo) error{ log.Println("start create topic...") broker := sarama.NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { log.Println("error in open the broker, ", err) return err } var topicDetail sarama.TopicDetail config := sarama.NewConfig() config.Version = sarama.V2_1_0_0 //kafka版本号 config.Net.SASL.Enable = true config.Net.SASL.Mechanism = "PLAIN" config.Net.SASL.User = "admin" config.Net.SASL.Password = "admin" admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config) if err != nil { log.Fatal("error in create new cluster ... ", err) return err } err = admin.CreateTopic(topic.TopicName, &topicDetail, false) if err != nil { log.Println("error in create topic, ", err) return err } err = admin.Close() if err != nil { log.Fatal("error in close admin, ", err) return err } return }2、添加生产者权限:
import ( "github.com/Shopify/sarama" "log" ) func Producer(topic *TopicInfo) error{ broker := sarama.NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { log.Println("error in open the broker, ", err) return err } config := sarama.NewConfig() config.Version = sarama.V2_1_0_0 config.Net.SASL.Enable = true config.Net.SASL.Mechanism = "PLAIN" config.Net.SASL.User = "admin" config.Net.SASL.Password = "admin" admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config) if err != nil { log.Fatal("error in create cluster admin ... ", err) return err } r := sarama.Resource{ResourceType: sarama.AclResourceTopic, ResourceName: topic.TopicName} a := sarama.Acl{Principal: "User:" + topic.Acc, Host: "*", Operation: sarama.AclOperationWrite, PermissionType: sarama.AclPermissionAllow} err = admin.CreateACL(r, a) log.Println("create acl finish") if err != nil { log.Println("error in create producer acl, ", err) return err } err = admin.Close() if err != nil { log.Fatal("error in close admin, ", err) return err } return nil }3、添加消费者权限:
import ( "github.com/Shopify/sarama" "log" ) func Consumer(topic *TopicInfo) error{ broker := sarama.NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { log.Println("error in open the broker, ", err) return err } config := sarama.NewConfig() config.Version = sarama.V2_1_0_0 config.Net.SASL.Enable = true config.Net.SASL.Mechanism = "PLAIN" config.Net.SASL.User = "admin" config.Net.SASL.Password = "admin" admin, err := sarama.NewClusterAdmin([]string{"localhost:9092"}, config) if err != nil { log.Fatal("error in create new cluster ... ", err) return err } r := sarama.Resource{ResourceType: sarama.AclResourceTopic, ResourceName: topic.TopicName} a := sarama.Acl{Principal: "User"+topic.Acc, Host: "*", Operation: sarama.AclOperationRead, PermissionType: sarama.AclPermissionAllow} err = admin.CreateACL(r, a) log.Println("create acl finish") if err != nil { log.Println("error in create consumer acl, ", err) return err } err = admin.Close() if err != nil { log.Fatal("error in close admin, ", err) return err } return nil }