package kafka

import (
	"bufio"
	"context"
	"fmt"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol/deletetopics"
)

// DeleteTopicsRequest represents a request sent to a kafka broker to delete
// topics.
type DeleteTopicsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// Names of topics to delete.
	Topics []string
}

// DeleteTopicsResponse represents a response from a kafka broker to a topic
// deletion request.
type DeleteTopicsResponse struct {
	// The amount of time that the broker throttled the request.
	//
	// This field will be zero if the kafka broker did no support the
	// DeleteTopics API in version 1 or above.
	Throttle time.Duration

	// Mapping of topic names to errors that occurred while attempting to delete
	// the topics.
	//
	// The errors contain the kafka error code. Programs may use the standard
	// errors.Is function to test the error against kafka error codes.
	Errors map[string]error
}

// DeleteTopics sends a topic deletion request to a kafka broker and returns the
// response.
func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
	m, err := c.roundTrip(ctx, req.Addr, &deletetopics.Request{
		TopicNames: req.Topics,
		TimeoutMs:  c.timeoutMs(ctx, defaultDeleteTopicsTimeout),
	})

	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).DeleteTopics: %w", err)
	}

	res := m.(*deletetopics.Response)
	ret := &DeleteTopicsResponse{
		Throttle: makeDuration(res.ThrottleTimeMs),
		Errors:   make(map[string]error, len(res.Responses)),
	}

	for _, t := range res.Responses {
		if t.ErrorCode == 0 {
			ret.Errors[t.Name] = nil
		} else {
			ret.Errors[t.Name] = Error(t.ErrorCode)
		}
	}

	return ret, nil
}

// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
type deleteTopicsRequestV0 struct {
	// Topics holds the topic names
	Topics []string

	// Timeout holds the time in ms to wait for a topic to be completely deleted
	// on the controller node. Values <= 0 will trigger topic deletion and return
	// immediately.
	Timeout int32
}

func (t deleteTopicsRequestV0) size() int32 {
	return sizeofStringArray(t.Topics) +
		sizeofInt32(t.Timeout)
}

func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
	wb.writeStringArray(t.Topics)
	wb.writeInt32(t.Timeout)
}

type deleteTopicsResponseV0 struct {
	// TopicErrorCodes holds per topic error codes
	TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
}

func (t deleteTopicsResponseV0) size() int32 {
	return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
}

func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
		var item deleteTopicsResponseV0TopicErrorCode
		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
			return
		}
		t.TopicErrorCodes = append(t.TopicErrorCodes, item)
		return
	}
	if remain, err = readArrayWith(r, size, fn); err != nil {
		return
	}
	return
}

func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
	wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
}

type deleteTopicsResponseV0TopicErrorCode struct {
	// Topic holds the topic name
	Topic string

	// ErrorCode holds the error code
	ErrorCode int16
}

func (t deleteTopicsResponseV0TopicErrorCode) size() int32 {
	return sizeofString(t.Topic) +
		sizeofInt16(t.ErrorCode)
}

func (t *deleteTopicsResponseV0TopicErrorCode) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	if remain, err = readString(r, size, &t.Topic); err != nil {
		return
	}
	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
		return
	}
	return
}

func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
	wb.writeString(t.Topic)
	wb.writeInt16(t.ErrorCode)
}

// deleteTopics deletes the specified topics.
//
// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
	var response deleteTopicsResponseV0
	err := c.writeOperation(
		func(deadline time.Time, id int32) error {
			if request.Timeout == 0 {
				now := time.Now()
				deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
				request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
			}
			return c.writeRequest(deleteTopics, v0, id, request)
		},
		func(deadline time.Time, size int) error {
			return expectZeroSize(func() (remain int, err error) {
				return (&response).readFrom(&c.rbuf, size)
			}())
		},
	)
	if err != nil {
		return deleteTopicsResponseV0{}, err
	}
	for _, c := range response.TopicErrorCodes {
		if c.ErrorCode != 0 {
			return response, Error(c.ErrorCode)
		}
	}
	return response, nil
}