package kafka

import (
	"context"
	"fmt"
	"net"
	"time"

	metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
)

// MetadataRequest represents a request sent to a kafka broker to retrieve its
// cluster metadata.
type MetadataRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// The list of topics to retrieve metadata for.
	Topics []string
}

// MetadatResponse represents a response from a kafka broker to a metadata
// request.
type MetadataResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Name of the kafka cluster that client retrieved metadata from.
	ClusterID string

	// The broker which is currently the controller for the cluster.
	Controller Broker

	// The list of brokers registered to the cluster.
	Brokers []Broker

	// The list of topics available on the cluster.
	Topics []Topic
}

// Metadata sends a metadata request to a kafka broker and returns the response.
func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) {
	m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{
		TopicNames: req.Topics,
	})

	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err)
	}

	res := m.(*metadataAPI.Response)
	ret := &MetadataResponse{
		Throttle:  makeDuration(res.ThrottleTimeMs),
		Brokers:   make([]Broker, len(res.Brokers)),
		Topics:    make([]Topic, len(res.Topics)),
		ClusterID: res.ClusterID,
	}

	brokers := make(map[int32]Broker, len(res.Brokers))

	for i, b := range res.Brokers {
		broker := Broker{
			Host: b.Host,
			Port: int(b.Port),
			ID:   int(b.NodeID),
			Rack: b.Rack,
		}

		ret.Brokers[i] = broker
		brokers[b.NodeID] = broker

		if b.NodeID == res.ControllerID {
			ret.Controller = broker
		}
	}

	for i, t := range res.Topics {
		ret.Topics[i] = Topic{
			Name:       t.Name,
			Internal:   t.IsInternal,
			Partitions: make([]Partition, len(t.Partitions)),
			Error:      makeError(t.ErrorCode, ""),
		}

		for j, p := range t.Partitions {
			partition := Partition{
				Topic:    t.Name,
				ID:       int(p.PartitionIndex),
				Leader:   brokers[p.LeaderID],
				Replicas: make([]Broker, len(p.ReplicaNodes)),
				Isr:      make([]Broker, len(p.IsrNodes)),
				Error:    makeError(p.ErrorCode, ""),
			}

			for i, id := range p.ReplicaNodes {
				partition.Replicas[i] = brokers[id]
			}

			for i, id := range p.IsrNodes {
				partition.Isr[i] = brokers[id]
			}

			ret.Topics[i].Partitions[j] = partition
		}
	}

	return ret, nil
}

type topicMetadataRequestV1 []string

func (r topicMetadataRequestV1) size() int32 {
	return sizeofStringArray([]string(r))
}

func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) {
	// communicate nil-ness to the broker by passing -1 as the array length.
	// for this particular request, the broker interpets a zero length array
	// as a request for no topics whereas a nil array is for all topics.
	if r == nil {
		wb.writeArrayLen(-1)
	} else {
		wb.writeStringArray([]string(r))
	}
}

type metadataResponseV1 struct {
	Brokers      []brokerMetadataV1
	ControllerID int32
	Topics       []topicMetadataV1
}

func (r metadataResponseV1) size() int32 {
	n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
	n2 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
	return 4 + n1 + n2
}

func (r metadataResponseV1) writeTo(wb *writeBuffer) {
	wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
	wb.writeInt32(r.ControllerID)
	wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type brokerMetadataV1 struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   string
}

func (b brokerMetadataV1) size() int32 {
	return 4 + 4 + sizeofString(b.Host) + sizeofString(b.Rack)
}

func (b brokerMetadataV1) writeTo(wb *writeBuffer) {
	wb.writeInt32(b.NodeID)
	wb.writeString(b.Host)
	wb.writeInt32(b.Port)
	wb.writeString(b.Rack)
}

type topicMetadataV1 struct {
	TopicErrorCode int16
	TopicName      string
	Internal       bool
	Partitions     []partitionMetadataV1
}

func (t topicMetadataV1) size() int32 {
	return 2 + 1 +
		sizeofString(t.TopicName) +
		sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t topicMetadataV1) writeTo(wb *writeBuffer) {
	wb.writeInt16(t.TopicErrorCode)
	wb.writeString(t.TopicName)
	wb.writeBool(t.Internal)
	wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type partitionMetadataV1 struct {
	PartitionErrorCode int16
	PartitionID        int32
	Leader             int32
	Replicas           []int32
	Isr                []int32
}

func (p partitionMetadataV1) size() int32 {
	return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr)
}

func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
	wb.writeInt16(p.PartitionErrorCode)
	wb.writeInt32(p.PartitionID)
	wb.writeInt32(p.Leader)
	wb.writeInt32Array(p.Replicas)
	wb.writeInt32Array(p.Isr)
}

type topicMetadataRequestV6 struct {
	Topics                 []string
	AllowAutoTopicCreation bool
}

func (r topicMetadataRequestV6) size() int32 {
	return sizeofStringArray([]string(r.Topics)) + 1
}

func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) {
	// communicate nil-ness to the broker by passing -1 as the array length.
	// for this particular request, the broker interpets a zero length array
	// as a request for no topics whereas a nil array is for all topics.
	if r.Topics == nil {
		wb.writeArrayLen(-1)
	} else {
		wb.writeStringArray([]string(r.Topics))
	}
	wb.writeBool(r.AllowAutoTopicCreation)
}

type metadataResponseV6 struct {
	ThrottleTimeMs int32
	Brokers        []brokerMetadataV1
	ClusterId      string
	ControllerID   int32
	Topics         []topicMetadataV6
}

func (r metadataResponseV6) size() int32 {
	n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
	n2 := sizeofNullableString(&r.ClusterId)
	n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
	return 4 + 4 + n1 + n2 + n3
}

func (r metadataResponseV6) writeTo(wb *writeBuffer) {
	wb.writeInt32(r.ThrottleTimeMs)
	wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
	wb.writeString(r.ClusterId)
	wb.writeInt32(r.ControllerID)
	wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type topicMetadataV6 struct {
	TopicErrorCode int16
	TopicName      string
	Internal       bool
	Partitions     []partitionMetadataV6
}

func (t topicMetadataV6) size() int32 {
	return 2 + 1 +
		sizeofString(t.TopicName) +
		sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t topicMetadataV6) writeTo(wb *writeBuffer) {
	wb.writeInt16(t.TopicErrorCode)
	wb.writeString(t.TopicName)
	wb.writeBool(t.Internal)
	wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type partitionMetadataV6 struct {
	PartitionErrorCode int16
	PartitionID        int32
	Leader             int32
	Replicas           []int32
	Isr                []int32
	OfflineReplicas    []int32
}

func (p partitionMetadataV6) size() int32 {
	return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas)
}

func (p partitionMetadataV6) writeTo(wb *writeBuffer) {
	wb.writeInt16(p.PartitionErrorCode)
	wb.writeInt32(p.PartitionID)
	wb.writeInt32(p.Leader)
	wb.writeInt32Array(p.Replicas)
	wb.writeInt32Array(p.Isr)
	wb.writeInt32Array(p.OfflineReplicas)
}