slaventius 2 years ago
parent fe737e4756
commit e6ef6b2186
  1. 4
      internal/transport/grpc/grpc.go
  2. 8
      internal/transport/kafka/kafka_writer.go

@ -113,7 +113,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe
} }
// //
log.Printf("send code %s to %s ...", user.msg.Code, user.msg.Email) log.Printf("publication code %s to %s ...", user.msg.Code, user.msg.Email)
// //
err := s.kafkaWriter.WriteMessage([]byte(user.Login), value) err := s.kafkaWriter.WriteMessage([]byte(user.Login), value)
@ -124,7 +124,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe
} }
// //
log.Printf("send code %s to %s completed", user.msg.Code, user.msg.Email) log.Printf("publication code %s to %s completed", user.msg.Code, user.msg.Email)
return &api.RegistrationResponse{ return &api.RegistrationResponse{
Code: user.msg.Code, Code: user.msg.Code,

@ -5,6 +5,7 @@ import (
"log" "log"
"net" "net"
"strconv" "strconv"
"time"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )
@ -20,9 +21,10 @@ func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWrite
s := &KafkaWriter{ s := &KafkaWriter{
ctx: ctx, ctx: ctx,
writer: &kafka.Writer{ writer: &kafka.Writer{
Topic: topic, Topic: topic,
Balancer: &kafka.LeastBytes{}, Balancer: &kafka.LeastBytes{},
Addr: kafka.TCP(address...), WriteBackoffMax: time.Millisecond * 100,
Addr: kafka.TCP(address...),
}, },
first: address[0], first: address[0],
topic: topic, topic: topic,

Loading…
Cancel
Save