diff --git a/internal/transport/kafka/writer.go b/internal/transport/kafka/writer.go index a760c22..27900cb 100644 --- a/internal/transport/kafka/writer.go +++ b/internal/transport/kafka/writer.go @@ -51,7 +51,8 @@ func (s *KafkaWriter) createTopic() error { // return era // } - // // + // + controllerConn := conn // controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) // if eru != nil { // return eru @@ -67,7 +68,7 @@ func (s *KafkaWriter) createTopic() error { }, } - return conn.CreateTopics(topicConfigs...) + return controllerConn.CreateTopics(topicConfigs...) } func (s *KafkaWriter) checkTopic() error {