**Describe the bug**
> I'm getting "unexpected EOF" error when calling `Write…r.WriteMessages()` while running Kafka and Zookeeper in docker containers using [dockertest](https://github.com/ory/dockertest) to build and run the containers. _However,_ when I'm running containers with the exact same configuration but using docker-compose I'm _not_ getting any errors.
**Kafka Version**
> * What version(s) of Kafka are you testing against?
3.1.0
> * What version of kafka-go are you using?
0.4.36
**To Reproduce**
> Resources to reproduce the behavior:
```yaml
---
# docker-compose.yaml
# had no errors in tests when used it to run containers
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
...
```
```go
// queue_test.go
package demo
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type KafkaQTestSuite struct {
suite.Suite
broker *kafkaQueue
}
var kafkaTestCfg QueueConfig
func setTestKafkaConfig(config QueueConfig) {
kafkaTestCfg = config
}
func TestKafkaAll(t *testing.T) {
t.Parallel()
assert.NoError(t, UseTempKafka(func(addr string) error {
setTestKafkaConfig(QueueConfig{
Address: addr,
Queue: "default_queue",
})
suite.Run(t, new(KafkaQTestSuite))
return nil
}))
}
func (s *KafkaQTestSuite) SetupTest() {
var (
err error
cli *kafkaQueue
)
if cli, err = NewKafkaQueue(kafkaTestCfg); err != nil {
s.T().Error("setup failed")
}
s.broker = cli
// Waiting for cluster leader election to complete before writing messages
time.Sleep(3 * time.Second)
}
func (s *KafkaQTestSuite) TearDownTest() {
_ = s.broker.close()
}
func (s *KafkaQTestSuite) TestProduceMessage() {
// Produce
pubMsg := []byte(uuid.New().String())
err := s.broker.Produce(context.TODO(), pubMsg)
if err != nil {
assert.NoError(s.T(), err, "Produce() error:\nwant nil\ngot %v", err)
}
}
```
```go
// queue.go
package demo
import (
"context"
"time"
"github.com/pkg/errors"
kafkago "github.com/segmentio/kafka-go"
)
type QueueConfig struct {
Address string `description:"Broker address" default:"localhost:5672"`
Queue string `description:"Queue name" default:"default_queue"`
}
type kafkaQueue struct {
w kafkago.Writer
r *kafkago.Reader
}
const (
numWriteRetries = 3
waitLeaderElection = 250 * time.Millisecond
)
func NewKafkaQueue(cfg QueueConfig) (*kafkaQueue, error) {
return &kafkaQueue{
w: kafkago.Writer{
Addr: kafkago.TCP(cfg.Address),
Topic: cfg.Queue,
AllowAutoTopicCreation: true,
},
r: kafkago.NewReader(kafkago.ReaderConfig{
Brokers: []string{cfg.Address},
Topic: cfg.Queue,
GroupID: cfg.Queue,
}),
}, nil
}
func (q *kafkaQueue) Produce(ctx context.Context, msg []byte) error {
for i := 0; i < numWriteRetries; i++ {
err := q.w.WriteMessages(
ctx, kafkago.Message{
Key: []byte{},
Value: msg,
},
)
if errors.Is(err, kafkago.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(waitLeaderElection)
continue
}
if err != nil {
return errors.Wrap(err, "failed to write kafka message")
}
}
return nil
}
func (q *kafkaQueue) Consume(ctx context.Context) ([]byte, error) {
msg, err := q.r.ReadMessage(ctx)
if err != nil {
return []byte{}, errors.Wrap(err, "failed to read kafka message")
}
return msg.Value, nil
}
func (q *kafkaQueue) close() error {
if err := q.w.Close(); err != nil {
return errors.Wrap(err, "failed to close w")
}
if err := q.r.Close(); err != nil {
return errors.Wrap(err, "failed to close r")
}
return nil
}
```
```go
// tmp_kafka.go
package demo
import (
"fmt"
"github.com/go-zookeeper/zk"
"github.com/pkg/errors"
kafkago "github.com/segmentio/kafka-go"
"time"
)
const (
DefaultRepository = "confluentinc/cp-kafka"
DefaultImage = "7.0.1"
HostNameKafka = "kafka"
ZookeeperRepository = "confluentinc/cp-zookeeper"
ZookeeperImage = "7.0.1"
HostNameZookeeper = "zookeeper"
ZookeeperKafkaNetwork = "zookeeper-kafka"
)
func UseTempKafka(use func(connStr string) error) error {
network, err := NewNetwork(ZookeeperKafkaNetwork)
if err != nil {
return fmt.Errorf("testbed new network error: %w", err)
}
zookeeper, err := zookeeperContainer(network)
if err != nil {
return fmt.Errorf("zookeeper container init failed: %w", err)
}
img := GetImage(DefaultRepository, DefaultImage, "KAFKA")
kafka, err := NewContainerFromNetwork(
network, HostNameKafka, img, kafkaEnv(), nil, kafkaExpPorts(), Bindings{},
)
if err != nil {
return fmt.Errorf("new container error: %w", err)
}
defer func() {
if errKill := kafka.Kill(); err == nil {
err = errKill
}
if errKill := zookeeper.Kill(); err == nil {
err = errKill
}
if errKill := network.Kill(); err == nil {
err = errKill
}
}()
srvAddr := zookeeper.GetHost("2181/tcp")
if err = zookeeper.Retry(RetryServer(srvAddr)); err != nil {
return fmt.Errorf("zookeeper retry error: %w", err)
}
brokerAddr := kafka.GetHost("9092/tcp")
if err = kafka.Retry(RetryBroker(brokerAddr)); err != nil {
return fmt.Errorf("kafka retry error: %w", err)
}
return use(brokerAddr)
}
func zookeeperContainer(network *Network) (*Container, error) {
img := GetImage(ZookeeperRepository, ZookeeperImage, "ZOOKEEPER")
env := []string{
"ZOOKEEPER_CLIENT_PORT=2181",
"ZOOKEEPER_TICK_TIME=2000",
}
exposed := []DockerPort{
{Port: "2181", Protocol: "tcp"},
}
container, err := NewContainerFromNetwork(
network, HostNameZookeeper, img, env, nil, exposed, nil,
)
if err != nil {
return nil, fmt.Errorf("testbed new pool error: %w", err)
}
return container, nil
}
func kafkaEnv() []string {
return []string{
"KAFKA_BROKER_ID=1",
"KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
"KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
}
}
func kafkaExpPorts() ExposedPorts {
return []DockerPort{
{Port: "9092", Protocol: "tcp"},
}
}
func RetryServer(addresses ...string) func() error {
return func() error {
fmt.Println("Pinging zookeeper...")
_, _, err := zk.Connect(addresses, 10*time.Second)
if err != nil {
return errors.Wrap(err, "failed to connect to zookeeper")
}
fmt.Println("Ping success")
return nil
}
}
func RetryBroker(addrStr string) func() error {
return func() error {
fmt.Println("Pinging kafka...")
con, err := kafkago.Dial("tcp", addrStr)
if err != nil {
return errors.Wrap(err, "failed to connect to kafkago")
}
defer con.Close()
fmt.Println("Ping success")
return nil
}
}
```
```go
// docker_api.go
package demo
import (
"fmt"
"log"
"os"
"strings"
"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"
)
type Container struct {
pool *dockertest.Pool
resource *dockertest.Resource
}
type Network struct {
pool *dockertest.Pool
net *dc.Network
}
type Image struct {
repository string
tag string
}
type DockerPort struct {
Protocol string
Port string
}
type ExposedPorts []DockerPort
type Address struct {
IP string
Port string
}
type Bindings []Binding
type Binding struct {
Port DockerPort
Hosts []Address
}
func (p DockerPort) String() string {
return fmt.Sprintf("%s/%s", p.Port, p.Protocol)
}
func (e ExposedPorts) toDC() (res []string) {
for _, port := range e {
res = append(res, port.String())
}
return res
}
func (b Bindings) toDC() map[dc.Port][]dc.PortBinding {
res := make(map[dc.Port][]dc.PortBinding)
for _, binding := range b {
res[dc.Port(binding.Port.String())] = binding.toDC()
}
return res
}
func (b Binding) toDC() []dc.PortBinding {
dcBinds := make([]dc.PortBinding, 0, len(b.Hosts))
for _, host := range b.Hosts {
dcBinds = append(dcBinds, dc.PortBinding{
HostIP: host.IP,
HostPort: host.Port,
})
}
return dcBinds
}
// GetImage возвращает образ для указанного репозитория и тега.
func GetImage(repository, tag string, envPostfix string) *Image {
if envPostfix != "" {
if envRep, ok := os.LookupEnv("TESTBED_REP_" + envPostfix); ok {
repository = envRep
}
if envTag, ok := os.LookupEnv("TESTBED_TAG_" + envPostfix); ok {
tag = envTag
}
}
return &Image{
repository: repository,
tag: tag,
}
}
func NewNetwork(name string) (*Network, error) {
pool, err := dockertest.NewPool("")
if err != nil {
return nil, fmt.Errorf("dockertest new pool error: %w", err)
}
network, err := pool.Client.CreateNetwork(dc.CreateNetworkOptions{Name: name})
if err != nil {
return nil, fmt.Errorf("could not create network %s: %w", name, err)
}
return &Network{
pool: pool,
net: network,
}, nil
}
func (n Network) ID() string {
return n.net.ID
}
func (n Network) Kill() error {
if err := n.pool.Client.RemoveNetwork(n.net.ID); err != nil {
return fmt.Errorf("could not remove %s network: %s", n.net.Name, err)
}
return nil
}
func NewContainerFromNetwork(
net *Network,
hostName string,
img *Image,
env []string,
cmd []string,
exposedPorts ExposedPorts,
portBindings Bindings,
) (*Container, error) {
auth, err := authCfg(img.repository)
if err != nil {
return nil, fmt.Errorf("failed to get auth cfg")
}
ops := &dockertest.RunOptions{
Repository: img.repository,
Tag: img.tag,
Env: env,
Cmd: cmd,
Auth: auth,
NetworkID: net.ID(),
Hostname: hostName,
ExposedPorts: exposedPorts.toDC(),
PortBindings: portBindings.toDC(),
}
resource, err := net.pool.RunWithOptions(ops)
if err != nil {
return nil, fmt.Errorf("pool run error: %w", err)
}
return &Container{
pool: net.pool,
resource: resource,
}, nil
}
func (c *Container) Kill() error {
if err := c.pool.Purge(c.resource); err != nil {
return fmt.Errorf("pool purge error: %w", err)
}
return nil
}
func (c *Container) GetHost(id string) string {
ip := c.resource.GetBoundIP(id)
port := c.resource.GetPort(id)
return fmt.Sprintf("%s:%s", ip, port)
}
func (c *Container) Retry(op func() error) error {
if err := c.pool.Retry(op); err != nil {
return fmt.Errorf("pool retry error: %w", err)
}
return nil
}
func authCfg(repo string) (dc.AuthConfiguration, error) {
all, err := dc.NewAuthConfigurationsFromDockerCfg()
if err != nil {
return dc.AuthConfiguration{}, fmt.Errorf("failed to get docker cfg: %w", err)
}
reg := registryFromRepo(repo)
cfg, ok := all.Configs[reg]
if !ok {
log.Printf("no docker config data for %s registry; not using auth creds", reg)
return dc.AuthConfiguration{}, nil
}
return cfg, nil
}
func registryFromRepo(repo string) string {
return strings.Split(repo, "/")[0]
}
```
**Expected Behavior**
> No errors.
**Observed Behavior**
> Error traceback:
```
=== RUN TestKafkaAll
=== PAUSE TestKafkaAll
=== CONT TestKafkaAll
Pinging zookeeper...
Ping success
Pinging kafka...
{"Timestamp":1667599781646695000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"connected to [::1]:55158"}
Ping success
--- FAIL: TestKafkaAll (4.07s)
=== RUN TestKafkaAll/TestProduceConsumeMessage
{"Timestamp":1667599781651114000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"authentication failed: EOF"}
{"Timestamp":1667599781652651000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"connected to 127.0.0.1:55158"}
{"Timestamp":1667599781655140000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"authentication failed: EOF"}
{"Timestamp":1667599782656391000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"connected to [::1]:55158"}
{"Timestamp":1667599782657636000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"authentication failed: EOF"}
{"Timestamp":1667599782658731000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"connected to 127.0.0.1:55158"}
{"Timestamp":1667599782659739000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"authentication failed: EOF"}
{"Timestamp":1667599783661340000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"connected to [::1]:55158"}
{"Timestamp":1667599783663772000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"authentication failed: EOF"}
{"Timestamp":1667599783665205000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"connected to 127.0.0.1:55158"}
{"Timestamp":1667599783666802000,"SeverityText":"INFO","SeverityNumber":9,"Resource":{"host.name":"cab-wsm-0041348","process.executable.name":"___KafkaQTestSuite_in_gitlab_ocp_delta_sbrf_ru_godev_core_git_broker.test","service.instance.id":"260013d1-8de8-48a8-b5fe-e83bacbab4ff","service.name":"noNameGoService","service.version":""},"Body":"authentication failed: EOF"}
kafka_test.go:108:
Error Trace: /Users/20182483/go/src/gitlab.ocp.delta.sbrf.ru/godev/core/broker/kafka_test.go:108
Error: Received unexpected error:
unexpected EOF
failed to write kafka msg
gitlab.ocp.delta.sbrf.ru/godev/core.git/broker.(*kafkaQueue).Produce
/Users/20182483/go/src/gitlab.ocp.delta.sbrf.ru/godev/core/broker/kafka.go:90
gitlab.ocp.delta.sbrf.ru/godev/core.git/broker.(*KafkaQTestSuite).TestProduceConsumeMessage
/Users/20182483/go/src/gitlab.ocp.delta.sbrf.ru/godev/core/broker/kafka_test.go:106
reflect.Value.call
/usr/local/go/src/reflect/value.go:584
reflect.Value.Call
/usr/local/go/src/reflect/value.go:368
github.com/stretchr/testify/suite.Run.func1
/Users/20182483/go/pkg/mod/github.com/stretchr/testify@v1.8.1/suite/suite.go:175
testing.tRunner
/usr/local/go/src/testing/testing.go:1446
runtime.goexit
/usr/local/go/src/runtime/asm_amd64.s:1594
Test: TestKafkaAll/TestProduceConsumeMessage
Messages: Produce() error:
want nil
got failed to write kafka msg: unexpected EOF
```
Logs from kafka docker container:
```
[2022-11-04 21:33:25,660] INFO KafkaConfig values:
advertised.listeners = PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.heartbeat.interval.ms = 2000
broker.id = 1
broker.id.generation.enable = true
broker.rack = null
broker.session.timeout.ms = 9000
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.listener.names = null
controller.quorum.append.linger.ms = 25
controller.quorum.election.backoff.max.ms = 1000
controller.quorum.election.timeout.ms = 1000
controller.quorum.fetch.timeout.ms = 2000
controller.quorum.request.timeout.ms = 2000
controller.quorum.retry.backoff.ms = 20
controller.quorum.voters = []
controller.quota.window.num = 11
controller.quota.window.size.seconds = 1
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delegation.token.secret.key = null
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.max.bytes = 57671680
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 3000
group.max.session.timeout.ms = 1800000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
initial.broker.registration.timeout.ms = 60000
inter.broker.listener.name = null
inter.broker.protocol.version = 3.0-IV1
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
listeners = PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /var/lib/kafka/data
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 3.0-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connection.creation.rate = 2147483647
max.connections = 2147483647
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1048588
metadata.log.dir = null
metadata.log.max.record.bytes.between.snapshots = 20971520
metadata.log.segment.bytes = 1073741824
metadata.log.segment.min.bytes = 8388608
metadata.log.segment.ms = 604800000
metadata.max.retention.bytes = -1
metadata.max.retention.ms = 604800000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
node.id = -1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
process.roles = []
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.window.num = 11
quota.window.size.seconds = 1
remote.log.index.file.cache.total.size.bytes = 1073741824
remote.log.manager.task.interval.ms = 30000
remote.log.manager.task.retry.backoff.max.ms = 30000
remote.log.manager.task.retry.backoff.ms = 500
remote.log.manager.task.retry.jitter = 0.2
remote.log.manager.thread.pool.size = 10
remote.log.metadata.manager.class.name = null
remote.log.metadata.manager.class.path = null
remote.log.metadata.manager.impl.prefix = null
remote.log.metadata.manager.listener.name = null
remote.log.reader.max.pending.tasks = 100
remote.log.reader.threads = 10
remote.log.storage.manager.class.name = null
remote.log.storage.manager.class.path = null
remote.log.storage.manager.impl.prefix = null
remote.log.storage.system.enable = false
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 30000
replica.selector.class = null
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism.controller.protocol = GSSAPI
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.server.callback.handler.class = null
security.inter.broker.protocol = PLAINTEXT
security.providers = null
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = DEFAULT
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.clientCnxnSocket = null
zookeeper.connect = zookeeper:2181
zookeeper.connection.timeout.ms = null
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 18000
zookeeper.set.acl = false
zookeeper.ssl.cipher.suites = null
zookeeper.ssl.client.enable = false
zookeeper.ssl.crl.enable = false
zookeeper.ssl.enabled.protocols = null
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
zookeeper.ssl.keystore.location = null
zookeeper.ssl.keystore.password = null
zookeeper.ssl.keystore.type = null
zookeeper.ssl.ocsp.enable = false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2022-11-04 21:33:25,688] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-11-04 21:33:25,690] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-11-04 21:33:25,691] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-11-04 21:33:25,692] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-11-04 21:33:25,720] INFO Loading logs from log dirs ArraySeq(/var/lib/kafka/data) (kafka.log.LogManager)
[2022-11-04 21:33:25,723] INFO Attempting recovery for all logs in /var/lib/kafka/data since no clean shutdown file was found (kafka.log.LogManager)
[2022-11-04 21:33:25,727] INFO Loaded 0 logs in 7ms. (kafka.log.LogManager)
[2022-11-04 21:33:25,728] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2022-11-04 21:33:25,730] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2022-11-04 21:33:25,739] INFO Starting the log cleaner (kafka.log.LogCleaner)
[2022-11-04 21:33:25,808] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner)
[2022-11-04 21:33:26,005] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Starting (kafka.server.BrokerToControllerRequestThread)
[2022-11-04 21:33:26,141] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2022-11-04 21:33:26,144] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2022-11-04 21:33:26,168] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2022-11-04 21:33:26,168] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2022-11-04 21:33:26,168] INFO Awaiting socket connections on 0.0.0.0:29092. (kafka.network.Acceptor)
[2022-11-04 21:33:26,176] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT_INTERNAL) (kafka.network.SocketServer)
[2022-11-04 21:33:26,182] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Starting (kafka.server.BrokerToControllerRequestThread)
[2022-11-04 21:33:26,200] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,201] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,202] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,203] INFO [ExpirationReaper-1-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,216] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2022-11-04 21:33:26,239] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.zk.KafkaZkClient)
[2022-11-04 21:33:26,256] INFO Stat of the created znode at /brokers/ids/1 is: 27,27,1667597606249,1667597606249,1,0,0,72060354174844929,270,0,27
(kafka.zk.KafkaZkClient)
[2022-11-04 21:33:26,256] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092, czxid (broker epoch): 27 (kafka.zk.KafkaZkClient)
[2022-11-04 21:33:26,297] INFO [ControllerEventThread controllerId=1] Starting (kafka.controller.ControllerEventManager$ControllerEventThread)
[2022-11-04 21:33:26,304] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,309] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,310] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,313] INFO Successfully created /controller_epoch with initial epoch 0 (kafka.zk.KafkaZkClient)
[2022-11-04 21:33:26,320] INFO [Controller id=1] 1 successfully elected as the controller. Epoch incremented to 1 and epoch zk version is now 1 (kafka.controller.KafkaController)
[2022-11-04 21:33:26,323] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2022-11-04 21:33:26,325] INFO [Controller id=1] Creating FeatureZNode at path: /feature with contents: FeatureZNode(Enabled,Features{}) (kafka.controller.KafkaController)
[2022-11-04 21:33:26,328] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2022-11-04 21:33:26,328] INFO Feature ZK node created at path: /feature (kafka.server.FinalizedFeatureChangeListener)
[2022-11-04 21:33:26,349] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2022-11-04 21:33:26,354] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2022-11-04 21:33:26,354] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2022-11-04 21:33:26,355] INFO Updated cache from existing <empty> to latest FinalizedFeaturesAndEpoch(features=Features{}, epoch=0). (kafka.server.FinalizedFeatureCache)
[2022-11-04 21:33:26,355] INFO [Controller id=1] Registering handlers (kafka.controller.KafkaController)
[2022-11-04 21:33:26,359] INFO [Controller id=1] Deleting log dir event notifications (kafka.controller.KafkaController)
[2022-11-04 21:33:26,362] INFO [Controller id=1] Deleting isr change notifications (kafka.controller.KafkaController)
[2022-11-04 21:33:26,366] INFO [Controller id=1] Initializing controller context (kafka.controller.KafkaController)
[2022-11-04 21:33:26,378] INFO [Controller id=1] Initialized broker epochs cache: HashMap(1 -> 27) (kafka.controller.KafkaController)
[2022-11-04 21:33:26,380] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-11-04 21:33:26,384] DEBUG [Controller id=1] Register BrokerModifications handler for Set(1) (kafka.controller.KafkaController)
[2022-11-04 21:33:26,389] DEBUG [Channel manager on controller 1]: Controller 1 trying to connect to broker 1 (kafka.controller.ControllerChannelManager)
[2022-11-04 21:33:26,397] INFO [RequestSendThread controllerId=1] Starting (kafka.controller.RequestSendThread)
[2022-11-04 21:33:26,399] INFO [Controller id=1] Currently active brokers in the cluster: Set(1) (kafka.controller.KafkaController)
[2022-11-04 21:33:26,400] INFO [Controller id=1] Currently shutting brokers in the cluster: HashSet() (kafka.controller.KafkaController)
[2022-11-04 21:33:26,401] INFO [Controller id=1] Current list of topics in the cluster: HashSet() (kafka.controller.KafkaController)
[2022-11-04 21:33:26,401] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-11-04 21:33:26,401] INFO [Controller id=1] Fetching topic deletions in progress (kafka.controller.KafkaController)
[2022-11-04 21:33:26,406] INFO [Controller id=1] List of topics to be deleted: (kafka.controller.KafkaController)
[2022-11-04 21:33:26,406] INFO [Controller id=1] List of topics ineligible for deletion: (kafka.controller.KafkaController)
[2022-11-04 21:33:26,407] INFO [Controller id=1] Initializing topic deletion manager (kafka.controller.KafkaController)
[2022-11-04 21:33:26,407] INFO [Topic Deletion Manager 1] Initializing manager with initial deletions: Set(), initial ineligible deletions: HashSet() (kafka.controller.TopicDeletionManager)
[2022-11-04 21:33:26,408] INFO [Controller id=1] Sending update metadata request (kafka.controller.KafkaController)
[2022-11-04 21:33:26,412] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet(1) for 0 partitions (state.change.logger)
[2022-11-04 21:33:26,414] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2022-11-04 21:33:26,419] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2022-11-04 21:33:26,420] INFO [ReplicaStateMachine controllerId=1] Initializing replica state (kafka.controller.ZkReplicaStateMachine)
[2022-11-04 21:33:26,421] INFO [ReplicaStateMachine controllerId=1] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)
[2022-11-04 21:33:26,424] INFO [ReplicaStateMachine controllerId=1] Triggering offline replica state changes (kafka.controller.ZkReplicaStateMachine)
[2022-11-04 21:33:26,424] DEBUG [ReplicaStateMachine controllerId=1] Started replica state machine with initial state -> HashMap() (kafka.controller.ZkReplicaStateMachine)
[2022-11-04 21:33:26,425] INFO [PartitionStateMachine controllerId=1] Initializing partition state (kafka.controller.ZkPartitionStateMachine)
[2022-11-04 21:33:26,426] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT_INTERNAL) (kafka.network.SocketServer)
[2022-11-04 21:33:26,427] INFO [PartitionStateMachine controllerId=1] Triggering online partition state changes (kafka.controller.ZkPartitionStateMachine)
[2022-11-04 21:33:26,427] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started socket server acceptors and processors (kafka.network.SocketServer)
[2022-11-04 21:33:26,431] DEBUG [PartitionStateMachine controllerId=1] Started partition state machine with initial state -> HashMap() (kafka.controller.ZkPartitionStateMachine)
[2022-11-04 21:33:26,431] INFO [Controller id=1] Ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController)
[2022-11-04 21:33:26,431] INFO Kafka version: 7.0.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2022-11-04 21:33:26,432] INFO Kafka commitId: b7e52413e7cb3e8b (org.apache.kafka.common.utils.AppInfoParser)
[2022-11-04 21:33:26,432] INFO Kafka startTimeMs: 1667597606427 (org.apache.kafka.common.utils.AppInfoParser)
[2022-11-04 21:33:26,433] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
[2022-11-04 21:33:26,436] INFO [RequestSendThread controllerId=1] Controller 1 connected to localhost:9092 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2022-11-04 21:33:26,440] INFO [Controller id=1] Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
[2022-11-04 21:33:26,442] INFO [Controller id=1] Partitions that completed preferred replica election: (kafka.controller.KafkaController)
[2022-11-04 21:33:26,442] INFO [Controller id=1] Skipping preferred replica election for partitions due to topic deletion: (kafka.controller.KafkaController)
[2022-11-04 21:33:26,442] INFO [Controller id=1] Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
[2022-11-04 21:33:26,443] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions triggered by ZkTriggered (kafka.controller.KafkaController)
[2022-11-04 21:33:26,455] INFO [Controller id=1] Starting the controller scheduler (kafka.controller.KafkaController)
[2022-11-04 21:33:26,502] TRACE [Controller id=1 epoch=1] Received response UpdateMetadataResponseData(errorCode=0) for request UPDATE_METADATA with correlation id 0 sent to broker localhost:9092 (id: 1 rack: null) (state.change.logger)
[2022-11-04 21:33:26,514] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use broker localhost:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-11-04 21:33:26,585] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Recorded new controller, from now on will use broker localhost:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-11-04 21:33:31,436] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2022-11-04 21:33:31,437] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
```
**Additional Context**
> I understand that the problem is likely in the kafka configuration I pass to the dockertest API and not in the kafka-go library since it works as expected when I use docker-compose to start containers. Any kind of help will be highly appreciated, I've been trying to figure it out for days without much progress yet.