package kafka

import (
	"bufio"
	"context"
	"fmt"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol/listoffsets"
)

// OffsetRequest represents a request to retrieve a single partition offset.
type OffsetRequest struct {
	Partition int
	Timestamp int64
}

// FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
// the parition given as argument.
func FirstOffsetOf(partition int) OffsetRequest {
	return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
}

// LastOffsetOf constructs an OffsetRequest which asks for the last offset of
// the partition given as argument.
func LastOffsetOf(partition int) OffsetRequest {
	return OffsetRequest{Partition: partition, Timestamp: LastOffset}
}

// TimeOffsetOf constructs an OffsetRequest which asks for a partition offset
// at a given time.
func TimeOffsetOf(partition int, at time.Time) OffsetRequest {
	return OffsetRequest{Partition: partition, Timestamp: timestamp(at)}
}

// PartitionOffsets carries information about offsets available in a topic
// partition.
type PartitionOffsets struct {
	Partition   int
	FirstOffset int64
	LastOffset  int64
	Offsets     map[int64]time.Time
	Error       error
}

// ListOffsetsRequest represents a request sent to a kafka broker to list of the
// offsets of topic partitions.
type ListOffsetsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// A mapping of topic names to list of partitions that the program wishes to
	// get the offsets for.
	Topics map[string][]OffsetRequest

	// The isolation level for the request.
	//
	// Defaults to ReadUncommitted.
	//
	// This field requires the kafka broker to support the ListOffsets API in
	// version 2 or above (otherwise the value is ignored).
	IsolationLevel IsolationLevel
}

// ListOffsetsResponse represents a response from a kafka broker to a offset
// listing request.
type ListOffsetsResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Mappings of topics names to partition offsets, there will be one entry
	// for each topic in the request.
	Topics map[string][]PartitionOffsets
}

// ListOffsets sends an offset request to a kafka broker and returns the
// response.
func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) {
	type topicPartition struct {
		topic     string
		partition int
	}

	partitionOffsets := make(map[topicPartition]PartitionOffsets)

	for topicName, requests := range req.Topics {
		for _, r := range requests {
			key := topicPartition{
				topic:     topicName,
				partition: r.Partition,
			}

			partition, ok := partitionOffsets[key]
			if !ok {
				partition = PartitionOffsets{
					Partition:   r.Partition,
					FirstOffset: -1,
					LastOffset:  -1,
					Offsets:     make(map[int64]time.Time),
				}
			}

			switch r.Timestamp {
			case FirstOffset:
				partition.FirstOffset = 0
			case LastOffset:
				partition.LastOffset = 0
			}

			partitionOffsets[topicPartition{
				topic:     topicName,
				partition: r.Partition,
			}] = partition
		}
	}

	topics := make([]listoffsets.RequestTopic, 0, len(req.Topics))

	for topicName, requests := range req.Topics {
		partitions := make([]listoffsets.RequestPartition, len(requests))

		for i, r := range requests {
			partitions[i] = listoffsets.RequestPartition{
				Partition:          int32(r.Partition),
				CurrentLeaderEpoch: -1,
				Timestamp:          r.Timestamp,
			}
		}

		topics = append(topics, listoffsets.RequestTopic{
			Topic:      topicName,
			Partitions: partitions,
		})
	}

	m, err := c.roundTrip(ctx, req.Addr, &listoffsets.Request{
		ReplicaID:      -1,
		IsolationLevel: int8(req.IsolationLevel),
		Topics:         topics,
	})

	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).ListOffsets: %w", err)
	}

	res := m.(*listoffsets.Response)
	ret := &ListOffsetsResponse{
		Throttle: makeDuration(res.ThrottleTimeMs),
		Topics:   make(map[string][]PartitionOffsets, len(res.Topics)),
	}

	for _, t := range res.Topics {
		for _, p := range t.Partitions {
			key := topicPartition{
				topic:     t.Topic,
				partition: int(p.Partition),
			}

			partition := partitionOffsets[key]

			switch p.Timestamp {
			case FirstOffset:
				partition.FirstOffset = p.Offset
			case LastOffset:
				partition.LastOffset = p.Offset
			default:
				partition.Offsets[p.Offset] = makeTime(p.Timestamp)
			}

			if p.ErrorCode != 0 {
				partition.Error = Error(p.ErrorCode)
			}

			partitionOffsets[key] = partition
		}
	}

	for key, partition := range partitionOffsets {
		ret.Topics[key.topic] = append(ret.Topics[key.topic], partition)
	}

	return ret, nil
}

type listOffsetRequestV1 struct {
	ReplicaID int32
	Topics    []listOffsetRequestTopicV1
}

func (r listOffsetRequestV1) size() int32 {
	return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
}

func (r listOffsetRequestV1) writeTo(wb *writeBuffer) {
	wb.writeInt32(r.ReplicaID)
	wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type listOffsetRequestTopicV1 struct {
	TopicName  string
	Partitions []listOffsetRequestPartitionV1
}

func (t listOffsetRequestTopicV1) size() int32 {
	return sizeofString(t.TopicName) +
		sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t listOffsetRequestTopicV1) writeTo(wb *writeBuffer) {
	wb.writeString(t.TopicName)
	wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type listOffsetRequestPartitionV1 struct {
	Partition int32
	Time      int64
}

func (p listOffsetRequestPartitionV1) size() int32 {
	return 4 + 8
}

func (p listOffsetRequestPartitionV1) writeTo(wb *writeBuffer) {
	wb.writeInt32(p.Partition)
	wb.writeInt64(p.Time)
}

type listOffsetResponseV1 []listOffsetResponseTopicV1

func (r listOffsetResponseV1) size() int32 {
	return sizeofArray(len(r), func(i int) int32 { return r[i].size() })
}

func (r listOffsetResponseV1) writeTo(wb *writeBuffer) {
	wb.writeArray(len(r), func(i int) { r[i].writeTo(wb) })
}

type listOffsetResponseTopicV1 struct {
	TopicName        string
	PartitionOffsets []partitionOffsetV1
}

func (t listOffsetResponseTopicV1) size() int32 {
	return sizeofString(t.TopicName) +
		sizeofArray(len(t.PartitionOffsets), func(i int) int32 { return t.PartitionOffsets[i].size() })
}

func (t listOffsetResponseTopicV1) writeTo(wb *writeBuffer) {
	wb.writeString(t.TopicName)
	wb.writeArray(len(t.PartitionOffsets), func(i int) { t.PartitionOffsets[i].writeTo(wb) })
}

type partitionOffsetV1 struct {
	Partition int32
	ErrorCode int16
	Timestamp int64
	Offset    int64
}

func (p partitionOffsetV1) size() int32 {
	return 4 + 2 + 8 + 8
}

func (p partitionOffsetV1) writeTo(wb *writeBuffer) {
	wb.writeInt32(p.Partition)
	wb.writeInt16(p.ErrorCode)
	wb.writeInt64(p.Timestamp)
	wb.writeInt64(p.Offset)
}

func (p *partitionOffsetV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
	if remain, err = readInt32(r, sz, &p.Partition); err != nil {
		return
	}
	if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
		return
	}
	if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
		return
	}
	if remain, err = readInt64(r, remain, &p.Offset); err != nil {
		return
	}
	return
}