[jira] [Created] (KAFKA-7382) We shoud guarantee at lest one replica of partition should be alive when create or update topic
zhaoshijie created KAFKA-7382: - Summary: We shoud guarantee at lest one replica of partition should be alive when create or update topic Key: KAFKA-7382 URL: https://issues.apache.org/jira/browse/KAFKA-7382 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.2.0 Reporter: zhaoshijie For example:I have brokers: 1,2,3,4,5. I create a new topic by command: {code:java} sh kafka-topics.sh --create --topic replicaserror --zookeeper localhost:2181 --replica-assignment 11:12:13,12:13:14,14:15:11,14:12:11,13:14:11 {code} Then kafkaController will process this,after partitionStateMachine and replicaStateMachine handle state change,topic metadatas and state will be strange,partitions is on NewPartition and replicas is on OnlineReplica. Next wo can not delete this topic(bacase state change illegal ),This will cause a number of problems.So i think wo shoud check replicas assignment when create or update topic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328478#comment-16328478 ] zhaoshijie commented on KAFKA-6449: --- `max.poll.interval.ms` is param for kafka 0.10.1.0 and above. 0.10.0.1 has not this param and heartbeating is not a background thread. I do not want trigger kafka client rebalance,but process records take more than 40s+. > KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more > than request.timeout.ms > --- > > Key: KAFKA-6449 > URL: https://issues.apache.org/jira/browse/KAFKA-6449 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: zhaoshijie >Priority: Major > > I use code as as follow consumer a partition of kafka topic, I got 40s > latency every poll > {code:java} > @Test > public void testTimeOut() throws Exception { > String test_topic = "timeOut_test"; > int test_partition = 1; > Map kafkaParams = new HashMap(); > kafkaParams.put("auto.offset.reset", "earliest"); > kafkaParams.put("enable.auto.commit", false); > kafkaParams.put("bootstrap.servers", "*"); > kafkaParams.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("group.id", "test-consumer-" + > System.currentTimeMillis()); > //kafkaParams.put("reconnect.backoff.ms", "0"); > //kafkaParams.put("max.poll.records", "500"); > KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams); > consumer.assign(Arrays.asList(new TopicPartition(test_topic, > test_partition))); > Long offset = 0L; > while (true) { > Long startPollTime = System.currentTimeMillis(); > consumer.seek(new TopicPartition(test_topic, test_partition), > offset); > ConsumerRecords records = consumer.poll(12); > logger.info("poll take " + (System.currentTimeMillis() - > startPollTime) + "ms, MSGCount is " + records.count()); > Thread.sleep(41000); > Iterator> consumerRecordIterable = > records.records(test_topic).iterator(); > while (consumerRecordIterable.hasNext()) { > offset = consumerRecordIterable.next().offset(); > } > } > } > {code} > log as follow: > {code:java} > 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 4 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > session.timeout.ms = 3 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092,
[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328398#comment-16328398 ] zhaoshijie commented on KAFKA-6449: --- [~huxi_2b] if remove `Thread.sleep(41000);` this problem will not happen. In practical use, `Thread.sleep(41000);` is equivalent to the process . I just mean code spend more then 40s to process records. > KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more > than request.timeout.ms > --- > > Key: KAFKA-6449 > URL: https://issues.apache.org/jira/browse/KAFKA-6449 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: zhaoshijie >Priority: Major > > I use code as as follow consumer a partition of kafka topic, I got 40s > latency every poll > {code:java} > @Test > public void testTimeOut() throws Exception { > String test_topic = "timeOut_test"; > int test_partition = 1; > Map kafkaParams = new HashMap(); > kafkaParams.put("auto.offset.reset", "earliest"); > kafkaParams.put("enable.auto.commit", false); > kafkaParams.put("bootstrap.servers", "*"); > kafkaParams.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("group.id", "test-consumer-" + > System.currentTimeMillis()); > //kafkaParams.put("reconnect.backoff.ms", "0"); > //kafkaParams.put("max.poll.records", "500"); > KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams); > consumer.assign(Arrays.asList(new TopicPartition(test_topic, > test_partition))); > Long offset = 0L; > while (true) { > Long startPollTime = System.currentTimeMillis(); > consumer.seek(new TopicPartition(test_topic, test_partition), > offset); > ConsumerRecords records = consumer.poll(12); > logger.info("poll take " + (System.currentTimeMillis() - > startPollTime) + "ms, MSGCount is " + records.count()); > Thread.sleep(41000); > Iterator> consumerRecordIterable = > records.records(test_topic).iterator(); > while (consumerRecordIterable.hasNext()) { > offset = consumerRecordIterable.next().offset(); > } > } > } > {code} > log as follow: > {code:java} > 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 4 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > session.timeout.ms = 3 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10
[jira] [Updated] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhaoshijie updated KAFKA-6449: -- Description: I use code as as follow consumer a partition of kafka topic, I got 40s latency every poll {code:java} @Test public void testTimeOut() throws Exception { String test_topic = "timeOut_test"; int test_partition = 1; Map kafkaParams = new HashMap(); kafkaParams.put("auto.offset.reset", "earliest"); kafkaParams.put("enable.auto.commit", false); kafkaParams.put("bootstrap.servers", "*"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id", "test-consumer-" + System.currentTimeMillis()); //kafkaParams.put("reconnect.backoff.ms", "0"); //kafkaParams.put("max.poll.records", "500"); KafkaConsumer consumer = new KafkaConsumer(kafkaParams); consumer.assign(Arrays.asList(new TopicPartition(test_topic, test_partition))); Long offset = 0L; while (true) { Long startPollTime = System.currentTimeMillis(); consumer.seek(new TopicPartition(test_topic, test_partition), offset); ConsumerRecords records = consumer.poll(12); logger.info("poll take " + (System.currentTimeMillis() - startPollTime) + "ms, MSGCount is " + records.count()); Thread.sleep(41000); Iterator> consumerRecordIterable = records.records(test_topic).iterator(); while (consumerRecordIterable.hasNext()) { offset = consumerRecordIterable.next().offset(); } } } {code} log as follow: {code:java} 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 30 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 4 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = test-consumer-1516100013868 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 session.timeout.ms = 3 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 auto.offset.reset = earliest | org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 30 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 4 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = test-consumer-1516100013868 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 600
[jira] [Created] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
zhaoshijie created KAFKA-6449: - Summary: KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms Key: KAFKA-6449 URL: https://issues.apache.org/jira/browse/KAFKA-6449 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.1 Reporter: zhaoshijie -- This message was sent by Atlassian JIRA (v7.6.3#76005)