package kafka

import (
	"context"
	"log"
	"net"
	"strconv"

	"github.com/segmentio/kafka-go"
)

type KafkaWriter struct {
	writer *kafka.Writer
	first  string
	topic  string
}

func NewWriter(topic string, address ...string) *KafkaWriter {
	s := &KafkaWriter{
		writer: &kafka.Writer{
			Topic:    topic,
			Balancer: &kafka.LeastBytes{},
			Addr:     kafka.TCP(address...),
		},
		first: address[0],
		topic: topic,
	}

	// Приверим и при необходимости создадим топик
	era := s.checkTopic()
	if era != nil {
		log.Fatal(era)
	}

	return s
}

func (s *KafkaWriter) Close() error {
	return s.writer.Close()
}

func (s *KafkaWriter) fetchTopics() (map[string]bool, error) {
	conn, err := kafka.Dial("tcp", s.first)
	if err != nil {
		return nil, err
	}
	defer conn.Close()

	//
	partitions, erp := conn.ReadPartitions()
	if erp != nil {
		return nil, erp
	}

	//
	topics := make(map[string]bool)
	for _, p := range partitions {
		topics[p.Topic] = true
	}

	return topics, nil
}

func (s *KafkaWriter) createTopic() error {
	conn, err := kafka.Dial("tcp", s.first)
	if err != nil {
		return err
	}
	defer conn.Close()

	//
	controller, era := conn.Controller()
	if era != nil {
		return era
	}

	//
	controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if eru != nil {
		return eru
	}
	defer controllerConn.Close()

	//
	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             s.topic,
			NumPartitions:     1,
			ReplicationFactor: 1,
		},
	}

	return controllerConn.CreateTopics(topicConfigs...)
}

func (s *KafkaWriter) checkTopic() error {
	topics, err := s.fetchTopics()
	if err != nil {
		return err
	}

	// Если топика нет, то создадим
	if _, ok := topics[s.topic]; !ok {
		return s.createTopic()
	}

	return nil
}

func (s *KafkaWriter) WriteMessage(key string, value string) error {
	err := s.writer.WriteMessages(context.Background(), kafka.Message{
		Key:   []byte(key),
		Value: []byte(value),
	})

	return err
}