From 30f36a44b09da264201199175962d28d4cf557aa Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 22:49:44 +0300 Subject: [PATCH] * --- internal/config/config.go | 5 ++--- internal/postman.go | 13 ++++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index db2edb4..8d6c4d7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,9 +14,8 @@ type smtpConfig struct { } type kafkaConfig struct { - Port int `envconfig:"KAFKA_PORT"` - Partition int `envconfig:"KAFKA_PARTITION"` - Host string `envconfig:"KAFKA_HOST"` + Port int `envconfig:"KAFKA_PORT"` + Host string `envconfig:"KAFKA_HOST"` } // ... diff --git a/internal/postman.go b/internal/postman.go index 26b89ab..ea4fb8e 100644 --- a/internal/postman.go +++ b/internal/postman.go @@ -3,10 +3,12 @@ package postman import ( "context" "encoding/json" + "fmt" "log" "net" "strconv" "test3k/authPostman/internal/config" + "time" // smtp "test3k/authPostman/internal/smtp" @@ -31,10 +33,11 @@ func NewServer(ctx context.Context, config *config.Config, topic string) *AuthPo kafkaReader: kafka.NewReader(kafka.ReaderConfig{ Topic: topic, Brokers: []string{net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))}, - // GroupID: fmt.Sprintf("consumer-group-%d", config.Kafka.Partition), - Partition: config.Kafka.Partition, - MinBytes: 10e3, // 10KB - MaxBytes: 10e6, // 10MB + GroupID: fmt.Sprintf("consumer-group-%s", topic), + // Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + ReadBackoffMax: time.Millisecond * 100, }), } } @@ -45,7 +48,7 @@ func (s *AuthPostmanServer) GracefulStop() error { func (s *AuthPostmanServer) ReadMessage(offset int64) error { // ... - s.kafkaReader.SetOffset(offset) + // s.kafkaReader.SetOffset(offset) // for {