[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860777#comment-17860777 ]
yazgoo commented on KAFKA-17049: -------------------------------- I have a first working solution, I'll clean it up and open a PR https://github.com/apache/kafka/compare/trunk...yazgoo:kafka:trunk > Incremental rebalances assign too many tasks for the same connector together > ---------------------------------------------------------------------------- > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect > Reporter: yazgoo > Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script, which > 1. runs one worker > 2. declares two connectors > 3. adds two more workers > > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1" \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 > --topic "test_topic$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.policy=compact, we can't have more than 1 partition > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic > $topic --config cleanup.policy=compact > done > } > cleanup_docker_env > launch_kafka > launch_minio > launch_kafka_connect 1 > while true > do > sleep 5 > # Check if Kafka Connect is up > curl http://localhost:8081/ || continue > break > done > sleep 10 > for i in {1..2} > do > # Set up a connector > curl -X POST -H "Content-Type: application/json" --data '{ > "name": "s3-connector'"$i"'", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "tasks.max": "12", > "topics": "test_topic'"$i"'", > "s3.region": "us-east-1", > "store.url": "http://0.0.0.0:9000", > "s3.bucket.name": "my-minio-bucket", > "s3.part.size": "5242880", > "flush.size": "3", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": "io.confluent.connect.s3.format.json.JsonFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "schema.compatibility": "NONE" > } > }' http://localhost:8081/connectors > done > launch_kafka_connect 2 > launch_kafka_connect 3 > {code} > > > When the script ends, I have the first worker taking all the connectors/tasks: > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081"{code} > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081" > {code} > > Then I wait a few minutes, > And I get the final state: > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks > |grep worker_id | sort | uniq -c > 6 "worker_id": "k2:8082" > 6 "worker_id": "k3:8083"{code} > > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq -c > 8 "worker_id": "k1:8081" > 2 "worker_id": "k2:8082" > 2 "worker_id": "k3:8083" > {code} > > In the end, we indeed get 8 tasks on each workers, but for distribution > reasons , I think it should be (4, 4, 4) for each connector, because all > connectors don't do the same amount of work, which will lead to a > processing/network imbalance overall. > In my test I always get the same outcome. > This is consistent with what I see in production, which makes autoscaling > impossible to use as is. -- This message was sent by Atlassian Jira (v8.20.10#820010)