[jira] [Created] (KAFKA-7382) We shoud guarantee at lest one replica of partition should be alive when create or update topic

2018-09-06 Thread zhaoshijie (JIRA)
zhaoshijie created KAFKA-7382:
-

 Summary: We shoud guarantee at lest one replica of partition 
should be alive when create or update topic
 Key: KAFKA-7382
 URL: https://issues.apache.org/jira/browse/KAFKA-7382
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.2.0
Reporter: zhaoshijie


For example:I have brokers: 1,2,3,4,5. I create a new topic by command: 
{code:java}
sh kafka-topics.sh --create --topic replicaserror --zookeeper localhost:2181 
--replica-assignment 11:12:13,12:13:14,14:15:11,14:12:11,13:14:11
{code}
Then kafkaController will process this,after partitionStateMachine and 
replicaStateMachine handle state change,topic metadatas and state will be 
strange,partitions is on NewPartition and replicas is on OnlineReplica. 
Next wo can not delete this topic(bacase state change illegal ),This will cause 
a number of problems.So i think wo shoud check replicas assignment when create 
or update topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-17 Thread zhaoshijie (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328478#comment-16328478
 ] 

zhaoshijie commented on KAFKA-6449:
---

 `max.poll.interval.ms` is param for kafka 0.10.1.0 and above.   0.10.0.1 has 
not this param and heartbeating is not  a background thread. I do not want 
trigger kafka client rebalance,but process records take more than 40s+.

> KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more 
> than request.timeout.ms
> ---
>
> Key: KAFKA-6449
> URL: https://issues.apache.org/jira/browse/KAFKA-6449
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: zhaoshijie
>Priority: Major
>
> I use code as as follow consumer a partition of kafka topic, I got 40s 
> latency every poll 
> {code:java}
> @Test
> public void testTimeOut() throws Exception {
> String test_topic = "timeOut_test";
> int test_partition = 1;
> Map kafkaParams = new HashMap();
> kafkaParams.put("auto.offset.reset", "earliest");
> kafkaParams.put("enable.auto.commit", false);
> kafkaParams.put("bootstrap.servers", "*");
> kafkaParams.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("group.id", "test-consumer-" + 
> System.currentTimeMillis());
> //kafkaParams.put("reconnect.backoff.ms", "0");
> //kafkaParams.put("max.poll.records", "500");
> KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams);
> consumer.assign(Arrays.asList(new TopicPartition(test_topic, 
> test_partition)));
> Long offset = 0L;
> while (true) {
> Long startPollTime = System.currentTimeMillis();
> consumer.seek(new TopicPartition(test_topic, test_partition), 
> offset);
> ConsumerRecords records = consumer.poll(12);
> logger.info("poll take " + (System.currentTimeMillis() - 
> startPollTime) + "ms, MSGCount is " + records.count());
> Thread.sleep(41000);
> Iterator> consumerRecordIterable = 
> records.records(test_topic).iterator();
> while (consumerRecordIterable.hasNext()) {
> offset = consumerRecordIterable.next().offset();
> }
> }
> }
> {code}
> log as follow:
> {code:java}
> 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = 
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
> | 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092,

[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-16 Thread zhaoshijie (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328398#comment-16328398
 ] 

zhaoshijie commented on KAFKA-6449:
---

[~huxi_2b] if  remove `Thread.sleep(41000);` this problem will not happen. In 
practical use,  `Thread.sleep(41000);` is equivalent to the process . I just 
mean code spend more then 40s to process records.

> KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more 
> than request.timeout.ms
> ---
>
> Key: KAFKA-6449
> URL: https://issues.apache.org/jira/browse/KAFKA-6449
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: zhaoshijie
>Priority: Major
>
> I use code as as follow consumer a partition of kafka topic, I got 40s 
> latency every poll 
> {code:java}
> @Test
> public void testTimeOut() throws Exception {
> String test_topic = "timeOut_test";
> int test_partition = 1;
> Map kafkaParams = new HashMap();
> kafkaParams.put("auto.offset.reset", "earliest");
> kafkaParams.put("enable.auto.commit", false);
> kafkaParams.put("bootstrap.servers", "*");
> kafkaParams.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("group.id", "test-consumer-" + 
> System.currentTimeMillis());
> //kafkaParams.put("reconnect.backoff.ms", "0");
> //kafkaParams.put("max.poll.records", "500");
> KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams);
> consumer.assign(Arrays.asList(new TopicPartition(test_topic, 
> test_partition)));
> Long offset = 0L;
> while (true) {
> Long startPollTime = System.currentTimeMillis();
> consumer.seek(new TopicPartition(test_topic, test_partition), 
> offset);
> ConsumerRecords records = consumer.poll(12);
> logger.info("poll take " + (System.currentTimeMillis() - 
> startPollTime) + "ms, MSGCount is " + records.count());
> Thread.sleep(41000);
> Iterator> consumerRecordIterable = 
> records.records(test_topic).iterator();
> while (consumerRecordIterable.hasNext()) {
> offset = consumerRecordIterable.next().offset();
> }
> }
> }
> {code}
> log as follow:
> {code:java}
> 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = 
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
> | 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10

[jira] [Updated] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-16 Thread zhaoshijie (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhaoshijie updated KAFKA-6449:
--
Description: 
I use code as as follow consumer a partition of kafka topic, I got 40s latency 
every poll 
{code:java}
@Test
public void testTimeOut() throws Exception {
String test_topic = "timeOut_test";
int test_partition = 1;

Map kafkaParams = new HashMap();
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("bootstrap.servers", "*");
kafkaParams.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "test-consumer-" + 
System.currentTimeMillis());
//kafkaParams.put("reconnect.backoff.ms", "0");
//kafkaParams.put("max.poll.records", "500");
KafkaConsumer consumer = new KafkaConsumer(kafkaParams);

consumer.assign(Arrays.asList(new TopicPartition(test_topic, 
test_partition)));
Long offset = 0L;
while (true) {
Long startPollTime = System.currentTimeMillis();
consumer.seek(new TopicPartition(test_topic, test_partition), 
offset);
ConsumerRecords records = consumer.poll(12);
logger.info("poll take " + (System.currentTimeMillis() - 
startPollTime) + "ms, MSGCount is " + records.count());
Thread.sleep(41000);
Iterator> consumerRecordIterable = 
records.records(test_topic).iterator();
while (consumerRecordIterable.hasNext()) {
offset = consumerRecordIterable.next().offset();
}
}
}

{code}
log as follow:
{code:java}
2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = 
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 4
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
group.id = test-consumer-1516100013868
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
session.timeout.ms = 3
metrics.num.samples = 2
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
auto.offset.reset = earliest
| org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 4
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
group.id = test-consumer-1516100013868
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 600

[jira] [Created] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-16 Thread zhaoshijie (JIRA)
zhaoshijie created KAFKA-6449:
-

 Summary: KafkaConsumer happen 40s timeOut when poll data after 
pollThread sleep more than request.timeout.ms
 Key: KAFKA-6449
 URL: https://issues.apache.org/jira/browse/KAFKA-6449
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.1
Reporter: zhaoshijie






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)