Good day.

I have configured Kafka to run on docker swarm, when I try to use Python to
send messages, I am being presented with the error messages such as the
ones below in rapid succession, non-stop.
What changes should I make so that I am able to send messages to the broker?



ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=1
host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error
111. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection.
KafkaConnectionError: 111 ECONNREFUSED
WARNING:kafka.client:Node 1 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092
[('127.0.0.1', 9092) IPv4]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=1
host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error
111. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection.
KafkaConnectionError: 111 ECONNREFUSED
WARNING:kafka.client:Node 1 connection failed -- refreshing metadata
exit();INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092
[('::1', 9092, 0, 0) IPv6]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=1
host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error
111. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection.
KafkaConnectionError: 111 ECONNREFUSED



Below is the python script I am using to connect.
from kafka import KafkaProducer;
import json;
import logging;
logging.basicConfig(level=logging.INFO);

producer = KafkaProducer(
    bootstrap_servers=["127.0.0.1:9089"]  # Replace with your Kafka broker
address
    ,request_timeout_ms=2000 #two seconds timeout
    ,retries=3
    ,value_serializer=lambda v: json.dumps(v).encode("utf-8")
);
data = {
    "event_type": "user_login",
    "user_id": 123,
    "timestamp": "2025-07-29T15:30:00Z"
}
topic_name = "topic1:1:1";

producer.send(topic_name, data);
producer.flush();



Below are the contents of the docker compose file I use to construct a
docker stack which is subsequently deployed on docker swarm.

---
    version: "3.9"

    services:

        zookeeper:
            image: confluentinc/cp-zookeeper:latest
            hostname: zookeeper
            ports:
                - "2181:2181"
            environment:
                ZOOKEEPER_CLIENT_PORT: 2181
                ZOOKEEPER_TICK_TIME: 2000

        kafka:
            image: apache/kafka:4.0.0
            environment:
                # Configure listeners for both docker and host communication
                -
                    "KAFKA_LISTENERS=CONTROLLER://localhost:9091,HOST://
0.0.0.0:9092,DOCKER://0.0.0.0:9093"
                -

"KAFKA_ADVERTISED_LISTENERS=HOST://localhost:9092,DOCKER://kafka:9093"
                -

"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
                -
                    "KAFKA_NODE_ID=1"
                -
                    "KAFKA_BROKER_ID=1"
                -
                    "KAFKA_PROCESS_ROLES=broker,controller"
                -
                    "KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER"
                -
                    "KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9091"
                -
                    "KAFKA_INTER_BROKER_LISTENER_NAME=DOCKER"
                -
                    "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1"
                -
                    "KAFKA_LOG_DIR=/local/data/kafka/data"
                -
                    "KAFKA_PORT=30000"
                -
                    "KAFKA_CREATE_TOPICS=topic1:1:1"
                -
                    "KAFKA_ZOOKEPER_CONNECT=zookeeper:2181"
            ports:
                -
                    "0.0.0.0:9089:9092/tcp"
            networks:
                -
                    "nf_genome_annot-network-kafka"
            volumes:
                -
                    type: bind
                    source:
/local/data/kafka_data/nf_genome_annot/nf_genome_annot_kafka-4-0-0/nf_genome_annot_kafka-4-0-0_kafka_9092.data
                    target: /local/data/kafka/data
                    read_only: false
            deploy:
                labels:
                    service.description: "This service runs the
nf_genome_annot_kafka-4-0-0 application for 'kafka' task"
                mode: replicated
                replicas: 1
                placement:
                    max_replicas_per_node: 1
                update_config:
                    parallelism: 1
                    delay: 10s
                resources:
                    limits:
                        cpus: "2"
                        memory: "2G"
                    reservations:
                        cpus: "1"
                        memory: "1G"
                restart_policy:
                    condition: on-failure
                    delay: 30s
                    max_attempts: 1
                    window: 30s

        kafka-ui:
            image: kafbat/kafka-ui:main
            ports:
                -
                    "0.0.0.0:8079:8080/tcp"
            environment:
                -
                    "DYNAMIC_CONFIG_ENABLED=true"
                -
                    "KAFKA_CLUSTERS_0_NAME=local"
                -
                    "KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093"


    networks:
            nf_genome_annot-network-kafka:
                external: true

Reply via email to