package kafka

import (
	"context"
	"fmt"
	"math"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol"
	fetchAPI "github.com/segmentio/kafka-go/protocol/fetch"
)

// FetchRequest represents a request sent to a kafka broker to retrieve records
// from a topic partition.
type FetchRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// Topic, partition, and offset to retrieve records from. The offset may be
	// one of the special FirstOffset or LastOffset constants, in which case the
	// request will automatically discover the first or last offset of the
	// partition and submit the request for these.
	Topic     string
	Partition int
	Offset    int64

	// Size and time limits of the response returned by the broker.
	MinBytes int64
	MaxBytes int64
	MaxWait  time.Duration

	// The isolation level for the request.
	//
	// Defaults to ReadUncommitted.
	//
	// This field requires the kafka broker to support the Fetch API in version
	// 4 or above (otherwise the value is ignored).
	IsolationLevel IsolationLevel
}

// FetchResponse represents a response from a kafka broker to a fetch request.
type FetchResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// The topic and partition that the response came for (will match the values
	// in the request).
	Topic     string
	Partition int

	// Informations about the topic partition layout returned from the broker.
	//
	// LastStableOffset requires the kafka broker to support the Fetch API in
	// version 4 or above (otherwise the value is zero).
	//
	/// LogStartOffset requires the kafka broker to support the Fetch API in
	// version 5 or above (otherwise the value is zero).
	HighWatermark    int64
	LastStableOffset int64
	LogStartOffset   int64

	// An error that may have occurred while attempting to fetch the records.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error

	// The set of records returned in the response.
	//
	// The program is expected to call the RecordSet's Close method when it
	// finished reading the records.
	//
	// Note that kafka may return record batches that start at an offset before
	// the one that was requested. It is the program's responsibility to skip
	// the offsets that it is not interested in.
	Records RecordReader
}

// Fetch sends a fetch request to a kafka broker and returns the response.
//
// If the broker returned an invalid response with no topics, an error wrapping
// protocol.ErrNoTopic is returned.
//
// If the broker returned an invalid response with no partitions, an error
// wrapping ErrNoPartitions is returned.
func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) {
	timeout := c.timeout(ctx, math.MaxInt64)
	maxWait := req.maxWait()

	if maxWait < timeout {
		timeout = maxWait
	}

	offset := req.Offset
	switch offset {
	case FirstOffset, LastOffset:
		topic, partition := req.Topic, req.Partition

		r, err := c.ListOffsets(ctx, &ListOffsetsRequest{
			Addr: req.Addr,
			Topics: map[string][]OffsetRequest{
				topic: {{
					Partition: partition,
					Timestamp: offset,
				}},
			},
		})
		if err != nil {
			return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
		}

		for _, p := range r.Topics[topic] {
			if p.Partition == partition {
				if p.Error != nil {
					return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", p.Error)
				}
				switch offset {
				case FirstOffset:
					offset = p.FirstOffset
				case LastOffset:
					offset = p.LastOffset
				}
				break
			}
		}
	}

	m, err := c.roundTrip(ctx, req.Addr, &fetchAPI.Request{
		ReplicaID:      -1,
		MaxWaitTime:    milliseconds(timeout),
		MinBytes:       int32(req.MinBytes),
		MaxBytes:       int32(req.MaxBytes),
		IsolationLevel: int8(req.IsolationLevel),
		SessionID:      -1,
		SessionEpoch:   -1,
		Topics: []fetchAPI.RequestTopic{{
			Topic: req.Topic,
			Partitions: []fetchAPI.RequestPartition{{
				Partition:          int32(req.Partition),
				CurrentLeaderEpoch: -1,
				FetchOffset:        offset,
				LogStartOffset:     -1,
				PartitionMaxBytes:  int32(req.MaxBytes),
			}},
		}},
	})

	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
	}

	res := m.(*fetchAPI.Response)
	if len(res.Topics) == 0 {
		return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", protocol.ErrNoTopic)
	}
	topic := &res.Topics[0]
	if len(topic.Partitions) == 0 {
		return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", protocol.ErrNoPartition)
	}
	partition := &topic.Partitions[0]

	ret := &FetchResponse{
		Throttle:         makeDuration(res.ThrottleTimeMs),
		Topic:            topic.Topic,
		Partition:        int(partition.Partition),
		Error:            makeError(res.ErrorCode, ""),
		HighWatermark:    partition.HighWatermark,
		LastStableOffset: partition.LastStableOffset,
		LogStartOffset:   partition.LogStartOffset,
		Records:          partition.RecordSet.Records,
	}

	if partition.ErrorCode != 0 {
		ret.Error = makeError(partition.ErrorCode, "")
	}

	if ret.Records == nil {
		ret.Records = NewRecordReader()
	}

	return ret, nil
}

func (req *FetchRequest) maxWait() time.Duration {
	if req.MaxWait > 0 {
		return req.MaxWait
	}
	return defaultMaxWait
}

type fetchRequestV2 struct {
	ReplicaID   int32
	MaxWaitTime int32
	MinBytes    int32
	Topics      []fetchRequestTopicV2
}

func (r fetchRequestV2) size() int32 {
	return 4 + 4 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
}

func (r fetchRequestV2) writeTo(wb *writeBuffer) {
	wb.writeInt32(r.ReplicaID)
	wb.writeInt32(r.MaxWaitTime)
	wb.writeInt32(r.MinBytes)
	wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type fetchRequestTopicV2 struct {
	TopicName  string
	Partitions []fetchRequestPartitionV2
}

func (t fetchRequestTopicV2) size() int32 {
	return sizeofString(t.TopicName) +
		sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t fetchRequestTopicV2) writeTo(wb *writeBuffer) {
	wb.writeString(t.TopicName)
	wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type fetchRequestPartitionV2 struct {
	Partition   int32
	FetchOffset int64
	MaxBytes    int32
}

func (p fetchRequestPartitionV2) size() int32 {
	return 4 + 8 + 4
}

func (p fetchRequestPartitionV2) writeTo(wb *writeBuffer) {
	wb.writeInt32(p.Partition)
	wb.writeInt64(p.FetchOffset)
	wb.writeInt32(p.MaxBytes)
}

type fetchResponseV2 struct {
	ThrottleTime int32
	Topics       []fetchResponseTopicV2
}

func (r fetchResponseV2) size() int32 {
	return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
}

func (r fetchResponseV2) writeTo(wb *writeBuffer) {
	wb.writeInt32(r.ThrottleTime)
	wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type fetchResponseTopicV2 struct {
	TopicName  string
	Partitions []fetchResponsePartitionV2
}

func (t fetchResponseTopicV2) size() int32 {
	return sizeofString(t.TopicName) +
		sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t fetchResponseTopicV2) writeTo(wb *writeBuffer) {
	wb.writeString(t.TopicName)
	wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type fetchResponsePartitionV2 struct {
	Partition           int32
	ErrorCode           int16
	HighwaterMarkOffset int64
	MessageSetSize      int32
	MessageSet          messageSet
}

func (p fetchResponsePartitionV2) size() int32 {
	return 4 + 2 + 8 + 4 + p.MessageSet.size()
}

func (p fetchResponsePartitionV2) writeTo(wb *writeBuffer) {
	wb.writeInt32(p.Partition)
	wb.writeInt16(p.ErrorCode)
	wb.writeInt64(p.HighwaterMarkOffset)
	wb.writeInt32(p.MessageSetSize)
	p.MessageSet.writeTo(wb)
}