From 724462a43b27276a8288c25a074b620cef3ede16 Mon Sep 17 00:00:00 2001 From: slaventius Date: Tue, 14 Mar 2023 11:09:57 +0300 Subject: [PATCH] * --- internal/transport/kafka/balancer.go | 13 +++++++++++++ internal/transport/kafka/writer.go | 3 ++- 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 internal/transport/kafka/balancer.go diff --git a/internal/transport/kafka/balancer.go b/internal/transport/kafka/balancer.go new file mode 100644 index 0000000..7fa2aef --- /dev/null +++ b/internal/transport/kafka/balancer.go @@ -0,0 +1,13 @@ +package kafka + +import "github.com/segmentio/kafka-go" + +// Собственный балансировщик для определения номера партиции +// в которую попадет очередное сообщение +type MyBalancer struct { + Cool bool +} + +func (s *MyBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) { + return 0 +} diff --git a/internal/transport/kafka/writer.go b/internal/transport/kafka/writer.go index 6954f0c..a760c22 100644 --- a/internal/transport/kafka/writer.go +++ b/internal/transport/kafka/writer.go @@ -95,7 +95,8 @@ func NewWriter(ctx context.Context, logger *logger.Logger, topic string, address s := &KafkaWriter{ ctx: ctx, writer: &kafka.Writer{ - Topic: topic, + Topic: topic, + // Balancer: &MyBalancer{}, Balancer: &kafka.LeastBytes{}, // Balancer: &kafka.Murmur2Balancer{}, WriteBackoffMax: time.Millisecond * 100,