diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d3802d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +authPostmanService diff --git a/cmd/main.go b/cmd/main.go index c3dac61..6a68a23 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,14 +10,16 @@ import ( "syscall" server "test3k/authPostman/internal" "test3k/authPostman/internal/config" +) - "github.com/segmentio/kafka-go" +const ( + topicRegistrations string = "registrations" // Топик для регистраций ) func main() { config := config.NewConfig() ctx, _ := context.WithCancel(context.Background()) - srv := server.NewServer(ctx, config) + srv := server.NewServer(ctx, config, topicRegistrations) // signalChannel := make(chan os.Signal, 1) @@ -54,26 +56,8 @@ func start(config *config.Config, srv *server.AuthPostmanServer) { // log.Printf("authPostmanServer starting (listening to %s)\n", connStr) - // - r := kafka.NewReader(kafka.ReaderConfig{ - Topic: "registrations", - Brokers: []string{connStr}, - GroupID: "consumer-group-id", - Partition: 0, - MinBytes: 10e3, // 10KB - MaxBytes: 10e6, // 10MB - }) - defer r.Close() - - // ... - r.SetOffset(0) - - for { - m, err := r.ReadMessage(context.Background()) - if err != nil { - break - } - - log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) + // Запускаем прослушивание + if err := srv.ReadMessage(0); err != nil { + log.Fatal(err) } } diff --git a/internal/postman.go b/internal/postman.go index f8f9f13..355fc48 100644 --- a/internal/postman.go +++ b/internal/postman.go @@ -2,17 +2,48 @@ package postman import ( "context" + "log" + "net" + "strconv" "test3k/authPostman/internal/config" + + "github.com/segmentio/kafka-go" ) type AuthPostmanServer struct { + kafkaReader *kafka.Reader + ctx context.Context } -func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { - return &AuthPostmanServer{} +func NewServer(ctx context.Context, config *config.Config, topic string) *AuthPostmanServer { + return &AuthPostmanServer{ + kafkaReader: kafka.NewReader(kafka.ReaderConfig{ + Topic: topic, + Brokers: []string{net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))}, + GroupID: "consumer-group-id", + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }), + ctx: ctx, + } } func (s *AuthPostmanServer) GracefulStop() error { - // return s.db.Close() - return nil + return s.kafkaReader.Close() +} + +func (s *AuthPostmanServer) ReadMessage(offset int64) error { + // ... + s.kafkaReader.SetOffset(offset) + + // + for { + m, err := s.kafkaReader.ReadMessage(s.ctx) + if err != nil { + return err + } + + log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) + } }