package protocol

import (
	"fmt"
)

// Error represents client-side protocol errors.
type Error string

func (e Error) Error() string { return string(e) }

func Errorf(msg string, args ...interface{}) Error {
	return Error(fmt.Sprintf(msg, args...))
}

const (
	// ErrNoTopic is returned when a request needs to be sent to a specific.
	ErrNoTopic Error = "topic not found"

	// ErrNoPartition is returned when a request needs to be sent to a specific
	// partition, but the client did not find it in the cluster metadata.
	ErrNoPartition Error = "topic partition not found"

	// ErrNoLeader is returned when a request needs to be sent to a partition
	// leader, but the client could not determine what the leader was at this
	// time.
	ErrNoLeader Error = "topic partition has no leader"

	// ErrNoRecord is returned when attempting to write a message containing an
	// empty record set (which kafka forbids).
	//
	// We handle this case client-side because kafka will close the connection
	// that it received an empty produce request on, causing all concurrent
	// requests to be aborted.
	ErrNoRecord Error = "record set contains no records"

	// ErrNoReset is returned by ResetRecordReader when the record reader does
	// not support being reset.
	ErrNoReset Error = "record sequence does not support reset"
)

type TopicError struct {
	Topic string
	Err   error
}

func NewTopicError(topic string, err error) *TopicError {
	return &TopicError{Topic: topic, Err: err}
}

func NewErrNoTopic(topic string) *TopicError {
	return NewTopicError(topic, ErrNoTopic)
}

func (e *TopicError) Error() string {
	return fmt.Sprintf("%v (topic=%q)", e.Err, e.Topic)
}

func (e *TopicError) Unwrap() error {
	return e.Err
}

type TopicPartitionError struct {
	Topic     string
	Partition int32
	Err       error
}

func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError {
	return &TopicPartitionError{
		Topic:     topic,
		Partition: partition,
		Err:       err,
	}
}

func NewErrNoPartition(topic string, partition int32) *TopicPartitionError {
	return NewTopicPartitionError(topic, partition, ErrNoPartition)
}

func NewErrNoLeader(topic string, partition int32) *TopicPartitionError {
	return NewTopicPartitionError(topic, partition, ErrNoLeader)
}

func (e *TopicPartitionError) Error() string {
	return fmt.Sprintf("%v (topic=%q partition=%d)", e.Err, e.Topic, e.Partition)
}

func (e *TopicPartitionError) Unwrap() error {
	return e.Err
}