[jira] [Resolved] (KAFKA-10584) IndexSearchType should use sealed trait instead of Enumeration

2020-10-09 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10584.

Fix Version/s: 2.7.0
   Resolution: Fixed

> IndexSearchType should use sealed trait instead of Enumeration
> --
>
> Key: KAFKA-10584
> URL: https://issues.apache.org/jira/browse/KAFKA-10584
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
>  Labels: newbie
> Fix For: 2.7.0
>
>
> In Scala, we prefer sealed traits over Enumeration since the former gives you 
> exhaustiveness checking. With Scala Enumeration, you don't get a warning if 
> you add a new value that is not handled in a given pattern match.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10456) wrong description in kafka-console-producer.sh help

2020-09-02 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10456.

Fix Version/s: 2.7.0
   Resolution: Fixed

> wrong description in kafka-console-producer.sh help
> ---
>
> Key: KAFKA-10456
> URL: https://issues.apache.org/jira/browse/KAFKA-10456
> Project: Kafka
>  Issue Type: Task
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: linux
>Reporter: danilo batista queiroz
>Assignee: huxihx
>Priority: Trivial
>  Labels: documentation
> Fix For: 2.7.0
>
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
> file: core/src/main/scala/kafka/tools/ConsoleProducer.scala
> In line 151, the description of "message-send-max-retries" has a text: 
> 'retires', and the correct is 'retries'



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9344) Logged consumer config does not always match actual config values

2020-08-25 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-9344.
---
Resolution: Fixed

> Logged consumer config does not always match actual config values
> -
>
> Key: KAFKA-9344
> URL: https://issues.apache.org/jira/browse/KAFKA-9344
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> Similar to KAFKA-8928, during consumer construction, some configs might be 
> overridden (client.id for instance), but the actual values will not be 
> reflected in the info log. It'd better display the overridden values for 
> those configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10407) add linger.ms parameter support to KafkaLog4jAppender

2020-08-19 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10407.

Fix Version/s: 2.7.0
   Resolution: Fixed

> add linger.ms parameter support to KafkaLog4jAppender
> -
>
> Key: KAFKA-10407
> URL: https://issues.apache.org/jira/browse/KAFKA-10407
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Yu Yang
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.7.0
>
>
> Currently  KafkaLog4jAppender does not accept `linger.ms` setting.   When a 
> service has an outrage that cause excessively error logging,  the service can 
> have too many producer requests to kafka brokers and overload the broker.  
> Setting a non-zero 'linger.ms' will allow kafka producer to batch records and 
> reduce # of producer request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10305) Print usage when parsing fails for ConsumerPerformance

2020-07-25 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10305.

Fix Version/s: 2.7.0
   Resolution: Fixed

> Print usage when parsing fails for ConsumerPerformance
> --
>
> Key: KAFKA-10305
> URL: https://issues.apache.org/jira/browse/KAFKA-10305
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.6.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.7.0
>
>
> When `kafka-consumer-perf-test.sh` is executed without required options or no 
> options at all, only the error message is displayed. It's better off showing 
> the usage as well like what we did for kafka-console-producer.sh.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10305) Print usage when parsing fails for ConsumerPerformance

2020-07-23 Thread huxihx (Jira)
huxihx created KAFKA-10305:
--

 Summary: Print usage when parsing fails for ConsumerPerformance
 Key: KAFKA-10305
 URL: https://issues.apache.org/jira/browse/KAFKA-10305
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 2.6.0
Reporter: huxihx
Assignee: huxihx


When `kafka-consumer-perf-test.sh` is executed without required options or no 
options at all, only the error message is displayed. It's better off showing 
the usage as well like what we did for kafka-console-producer.sh.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10268) dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-10268.

Fix Version/s: 2.7.0
   Resolution: Fixed

> dynamic config like "--delete-config log.retention.ms" doesn't work
> ---
>
> Key: KAFKA-10268
> URL: https://issues.apache.org/jira/browse/KAFKA-10268
> Project: Kafka
>  Issue Type: Bug
>  Components: log, log cleaner
>Affects Versions: 2.1.1
>Reporter: zhifeng.peng
>Assignee: huxihx
>Priority: Major
> Fix For: 2.7.0
>
> Attachments: server.log.2020-07-13-14
>
>
> After I set "log.retention.ms=301000" to clean the data,i use the cmd
> "bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type 
> brokers --entity-default --alter --delete-config log.retention.ms" to reset 
> to default.
> Static broker configuration like log.retention.hours is 168h and no topic 
> level configuration like retention.ms.
> it did not take effect actually although server.log print the broker 
> configuration like that.
> log.retention.check.interval.ms = 30
>  log.retention.hours = 168
>  log.retention.minutes = null
>  {color:#ff}log.retention.ms = null{color}
>  log.roll.hours = 168
>  log.roll.jitter.hours = 0
>  log.roll.jitter.ms = null
>  log.roll.ms = null
>  log.segment.bytes = 1073741824
>  log.segment.delete.delay.ms = 6
>  
> Then we can see that retention time is still 301000ms from the server.log and 
> segments have been deleted.
> [2020-07-13 14:30:00,958] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Found deletable segments with base offsets 
> [5005329,6040360] due to retention time 301000ms breach (kafka.log.Log)
>  [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 5005329, size 
> 1073741222] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 6040360, size 
> 1073728116] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Incrementing log start offset to 7075648 
> (kafka.log.Log)
>  [2020-07-13 14:30:00,960] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Found deletable segments with base offsets 
> [5005330,6040410] {color:#FF}due to retention time 301000ms{color} breach 
> (kafka.log.Log)
>  [2020-07-13 14:30:00,960] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 5005330, size 
> 1073732368] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 6040410, size 
> 1073735366] for deletion. (kafka.log.Log)
>  [2020-07-13 14:30:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Incrementing log start offset to 7075685 
> (kafka.log.Log)
>  [2020-07-13 14:31:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Deleting segment 5005329 (kafka.log.Log)
>  [2020-07-13 14:31:00,959] INFO [Log partition=test_retention-2, 
> dir=/data/kafka_logs-test] Deleting segment 6040360 (kafka.log.Log)
>  [2020-07-13 14:31:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Deleting segment 5005330 (kafka.log.Log)
>  [2020-07-13 14:31:00,961] INFO [Log partition=test_retention-0, 
> dir=/data/kafka_logs-test] Deleting segment 6040410 (kafka.log.Log)
>  [2020-07-13 14:31:01,144] INFO Deleted log 
> /data/kafka_logs-test/test_retention-2/06040360.log.deleted. 
> (kafka.log.LogSegment)
>  [2020-07-13 14:31:01,144] INFO Deleted offset index 
> /data/kafka_logs-test/test_retention-2/06040360.index.deleted. 
> (kafka.log.LogSegment)
>  [2020-07-13 14:31:01,144] INFO Deleted time index 
> /data/kafka_logs-test/test_retention-2/06040360.timeindex.deleted.
>  (kafka.log.LogSegment)
>  
> Here are a few steps to reproduce it.
> 1、set log.retention.ms=301000:
> bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type 
> brokers --entity-default --alter --add-config log.retention.ms=301000
> 2、produce messages to the topic:
> bin/kafka-producer-perf-test.sh --topic test_retention --num-records 1000 
> --throughput -1 --producer-props bootstrap.servers=10.129.104.15:9092 
> --record-size 1024
> 3、reset log.retention.ms to the default:
> bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type 
> brokers --entity-default --alter --delete-config log.retention.ms
>  
> I have attched server.log. You can see the log from row 238 to row 731. 



--
This message was sent by Atlassian Jira

[jira] [Created] (KAFKA-10222) Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-01 Thread huxihx (Jira)
huxihx created KAFKA-10222:
--

 Summary: Incorrect methods show up in 0.10 Kafka Streams docs
 Key: KAFKA-10222
 URL: https://issues.apache.org/jira/browse/KAFKA-10222
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 0.10.0.0
Reporter: huxihx
Assignee: huxihx


In 0.10 Kafka Streams 
[doc|http://[http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html]],
 two wrong methods show up, as show below:

 _builder.from("my-input-topic").mapValue(value -> 
value.length().toString()).to("my-output-topic");_

 

There is no method named `from` or `mapValues`. They should be `stream` and 
`mapValues` respectively.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9541) Flaky Test DescribeConsumerGroupTest#testDescribeGroupWithShortInitializationTimeout

2020-02-11 Thread huxihx (Jira)
huxihx created KAFKA-9541:
-

 Summary: Flaky Test 
DescribeConsumerGroupTest#testDescribeGroupWithShortInitializationTimeout
 Key: KAFKA-9541
 URL: https://issues.apache.org/jira/browse/KAFKA-9541
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Affects Versions: 2.4.0
Reporter: huxihx
Assignee: huxihx


h3. Error Message

java.lang.AssertionError: assertion failed
h3. Stacktrace

java.lang.AssertionError: assertion failed at 
scala.Predef$.assert(Predef.scala:267) at 
kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:630)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
 at jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
 at jdk.internal.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
 at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
 at 
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
 at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
 at 

[jira] [Created] (KAFKA-9344) Logged consumer config does not always match actual config values

2019-12-29 Thread huxihx (Jira)
huxihx created KAFKA-9344:
-

 Summary: Logged consumer config does not always match actual 
config values
 Key: KAFKA-9344
 URL: https://issues.apache.org/jira/browse/KAFKA-9344
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.4.0
Reporter: huxihx
Assignee: huxihx


Similar to [KAFKA-8928|https://issues.apache.org/jira/browse/KAFKA-8928]During 
consumer construction, some configs might be overridden (client.id for 
instance), but the actual values will not be reflected in the info log. It'd 
better display the overridden values for those configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9322) Add `tail -n` feature for ConsoleConsumer

2019-12-19 Thread huxihx (Jira)
huxihx created KAFKA-9322:
-

 Summary: Add `tail -n` feature for ConsoleConsumer
 Key: KAFKA-9322
 URL: https://issues.apache.org/jira/browse/KAFKA-9322
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 2.4.0
Reporter: huxihx
Assignee: huxihx


When debugging, it will be convenient to quickly check the last N messages for 
a partition using ConsoleConsumer. Currently `offset` could not be negative. 
However, we could simply break this rule to support `tail -n` feature.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9316) ConsoleProducer help info not expose default properties

2019-12-18 Thread huxihx (Jira)
huxihx created KAFKA-9316:
-

 Summary: ConsoleProducer help info not expose default properties
 Key: KAFKA-9316
 URL: https://issues.apache.org/jira/browse/KAFKA-9316
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 2.4.0
Reporter: huxihx
Assignee: huxihx


Unlike ConsoleConsumer, ConsoleProducer help info does not expose default 
properties. Users cannot know what default properties are supported by checking 
the help info.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9208) Flaky Test SslAdminClientIntegrationTest.testCreatePartitions

2019-11-19 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-9208.
---
Resolution: Duplicate

> Flaky Test SslAdminClientIntegrationTest.testCreatePartitions
> -
>
> Key: KAFKA-9208
> URL: https://issues.apache.org/jira/browse/KAFKA-9208
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> Java 8 build failed on 2.4-targeted PR
> h3. Stacktrace
> java.lang.AssertionError: validateOnly expected:<3> but was:<1> at 
> org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.failNotEquals(Assert.java:835) at 
> org.junit.Assert.assertEquals(Assert.java:647) at 
> kafka.api.AdminClientIntegrationTest$$anonfun$testCreatePartitions$6.apply(AdminClientIntegrationTest.scala:625)
>  at 
> kafka.api.AdminClientIntegrationTest$$anonfun$testCreatePartitions$6.apply(AdminClientIntegrationTest.scala:599)
>  at scala.collection.immutable.List.foreach(List.scala:392) at 
> kafka.api.AdminClientIntegrationTest.testCreatePartitions(AdminClientIntegrationTest.scala:599)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9093) NullPointerException in KafkaConsumer with group.instance.id

2019-10-31 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-9093.
---
Resolution: Fixed

> NullPointerException in KafkaConsumer with group.instance.id
> 
>
> Key: KAFKA-9093
> URL: https://issues.apache.org/jira/browse/KAFKA-9093
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Rolef Heinrich
>Assignee: huxihx
>Priority: Minor
> Fix For: 2.3.2
>
>
> When using *group.instance.id=myUniqId[0]*, the KafkaConsumer's constructor 
> throws a NullpointerException in close():
>  
> {code:java}
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
> {code}
> {{It turns out that the exception is thrown because the *log* member is not 
> yet initialized (still null) in the constructor when the original exception 
> is handled. The original exception is thrown before *log* is initailized.}}
> {{The side effect of this error is, that close does does not cleanup 
> resources as clean is supposed to do.}}
> *{{The used consumer properties for reference:}}*
>  
> {code:java}
> key.deserializer=com.ibm.streamsx.kafka.serialization
> request.timeout.ms=25000
> value.deserializer=com.ibm.streamsx.kafka.serialization
> client.dns.lookup=use_all_dns_ips
> metadata.max.age.ms=2000
> enable.auto.commit=false
> group.instance.id=myUniqId[0]
> max.poll.interval.ms=30
> group.id=consumer-0
> metric.reporters=com.ibm.streamsx.kafka.clients.consum...
> reconnect.backoff.max.ms=1
> bootstrap.servers=localhost:9092
> max.poll.records=50
> session.timeout.ms=2
> client.id=C-J37-ReceivedMessages[0]
> allow.auto.create.topics=false
> metrics.sample.window.ms=1
> retry.backoff.ms=500
> reconnect.backoff.ms=250{code}
> *Expected behaviour:* throw exception indicating that something is wrong with 
> the chosen group.instance.id.
> The documentation does not tell anything about valid values for 
> group.instance.id.
> *Reproduce:*
>  
>  
> {code:java}
>  
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> public class Main {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put (ConsumerConfig.GROUP_ID_CONFIG, "group-Id1");
> props.put (ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "myUniqId[0]");
> props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
> KafkaConsumer c = new KafkaConsumer (props);
> }
> }
> Exception in thread "main" java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2204)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:664)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644)
>   at Main.main(Main.java:15)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8915) Unable to modify partition

2019-09-19 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-8915.
---
Resolution: Not A Problem

> Unable to modify partition
> --
>
> Key: KAFKA-8915
> URL: https://issues.apache.org/jira/browse/KAFKA-8915
> Project: Kafka
>  Issue Type: Bug
>Reporter: lingyi.zhong
>Priority: Major
>
> [root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
> --replication-factor 1 --partitions 1  --topic test_topic3[root@work1 kafka]# 
> bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
> --replication-factor 1 --partitions 1  --topic test_topic3
> WARNING: Due to limitations in metric names, topics with a period ('.') or 
> underscore ('_') could collide. To avoid issues it is best to use either, but 
> not both.Created topic "test_topic3".[root@work1 kafka]# bin/kafka-topics.sh  
> --alter --zookeeper 10.20.30.78:2181/chroot  --partition 2 --topic test_topic3
> Exception in thread "main" joptsimple.UnrecognizedOptionException: partition 
> is not a recognized option at 
> joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at 
> joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at 
> joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at 
> joptsimple.OptionParser.parse(OptionParser.java:396) at 
> kafka.admin.TopicCommand$TopicCommandOptions.(TopicCommand.scala:358) 
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:44) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8881) Measure thread running time precisely

2019-09-06 Thread huxihx (Jira)
huxihx created KAFKA-8881:
-

 Summary: Measure thread running time precisely
 Key: KAFKA-8881
 URL: https://issues.apache.org/jira/browse/KAFKA-8881
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.4.0
Reporter: huxihx


Currently, the code uses `System.currentTimeMillis()` to measure timeout. 
However, many situations trigger the thread suspend such as gc and context 
switch. In such cases, the timeout value we specify is not strictly honored. 
Maybe we could use ThreadMXBean#getCurrentThreadUserTime to precisely measure 
the thread running time.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8719) kafka-console-consumer bypassing sentry evaluations while specifying --partition option

2019-09-02 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-8719.
---
Resolution: Cannot Reproduce

> kafka-console-consumer bypassing sentry evaluations while specifying 
> --partition option
> ---
>
> Key: KAFKA-8719
> URL: https://issues.apache.org/jira/browse/KAFKA-8719
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, tools
>Reporter: Sathish
>Priority: Major
>  Labels: kafka-console-cons
>
> While specifying --partition option on kafka-console-consumer, it is 
> bypassing the sentry evaluations and making the users to consume messages 
> freely. Even though a consumer group does not have access to consume messages 
> from topics --partition option bypassing the evaluation
> Example command used:
> #kafka-console-consumer  --topic booktopic1 --consumer.config 
> consumer.properties --bootstrap-server :9092 --from-beginning 
> --consumer-property group.id=spark-kafka-111 --partition 0
> This succeeds even though, if spark-kafka-111 does not have any access on 
> topic booktopic1
> whereas 
> #kafka-console-consumer  --topic booktopic1 --consumer.config 
> consumer.properties --bootstrap-server :9092 --from-beginning 
> --consumer-property group.id=spark-kafka-111
> Fails with topic authorisation issues



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8350) Splitting batches should consider topic-level message size

2019-05-10 Thread huxihx (JIRA)
huxihx created KAFKA-8350:
-

 Summary: Splitting batches should consider topic-level message size
 Key: KAFKA-8350
 URL: https://issues.apache.org/jira/browse/KAFKA-8350
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 2.3.0
Reporter: huxihx


Currently, producers do the batch splitting based on the batch size. However, 
the split will never succeed when batch size is greatly larger than the 
topic-level max message size.

For instance, if the batch size is set to 8MB but we maintain the default value 
for broker-side `message.max.bytes` (112, about1MB), producer will 
endlessly try to split a large batch but never succeeded, as shown below:
{code:java}
[2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 61 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 62 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 63 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 64 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617){code}
A better solution is to have producer do splitting based on the minimum of 
these two configs. However, it is tricky for the client to get the topic-level 
or broker-level config values. Seems  there could be three ways to do this:
 # When broker throws `RecordTooLargeException`, do not swallow its real 
message since it contains the max message size already. If the message is not 
swallowed, the client easily gets it from the response.
 # Add code to issue  `DescribeConfigsRequest` to retrieve the value.
 # If splitting failed, lower down the batch size gradually until the split is 
successful. For  example, 

{code:java}
// In RecordAccumulator.java
private int steps = 1;
..
public int splitAndReenqueue(ProducerBatch bigBatch) {
..
Deque dq = bigBatch.split(this.batchSize / steps);
if (dq.size() == 1) // split failed
steps++;
..
}{code}
Do all of these make sense?

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8161) Comma conflict when run script bin/kafka-configs.sh with config 'follower.replication.throttled.replicas'

2019-04-10 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-8161.
---
Resolution: Not A Problem

> Comma conflict when run script  bin/kafka-configs.sh with config 
> 'follower.replication.throttled.replicas'
> --
>
> Key: KAFKA-8161
> URL: https://issues.apache.org/jira/browse/KAFKA-8161
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.1
>Reporter: Haiping
>Priority: Minor
>
> when executing config command,it suggest  that 
> follower.replication.throttled.replicas  must match for format 
> [partitionId],[brokerId]:[partitionId],[brokerId]:[partitionId],[brokerId] 
> etc. but when config like that, it run with the following error:
> bin/kafka-configs.sh --entity-type topics --entity-name topic-test1  
> --zookeeper  127.0.0.1:2181/kafka --add-config 
> 'follower.replication.throttled.replicas=0,1:1,2' --alter
> Error while executing config command requirement failed: Invalid entity 
> config: all configs to be added must be in the format "key=val".
>  java.lang.IllegalArgumentException: requirement failed: Invalid entity 
> config: all configs to be added must be in the format "key=val".
>      at scala.Predef$.require(Predef.scala:224)
>      at 
> kafka.admin.ConfigCommand$.parseConfigsToBeAdded(ConfigCommand.scala:162)
>      at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:81)
>      at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:68)
>      at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
> It seem that comma has been the separator of both replicas 
> {color:#33}such as{color} ([partitionId],[brokerId])  and keys such as 
> (key=val,key=val).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7801) TopicCommand should not be able to alter transaction topic partition count

2019-01-08 Thread huxihx (JIRA)
huxihx created KAFKA-7801:
-

 Summary: TopicCommand should not be able to alter transaction 
topic partition count
 Key: KAFKA-7801
 URL: https://issues.apache.org/jira/browse/KAFKA-7801
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.1.0
Reporter: huxihx
Assignee: huxihx


To keep align with the way it handles the offset topic, TopicCommand should not 
be able to alter transaction topic partition count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7779) Avoid unnecessary loop iteration in leastLoadedNode

2019-01-02 Thread huxihx (JIRA)
huxihx created KAFKA-7779:
-

 Summary: Avoid unnecessary loop iteration in leastLoadedNode
 Key: KAFKA-7779
 URL: https://issues.apache.org/jira/browse/KAFKA-7779
 Project: Kafka
  Issue Type: Improvement
  Components: network
Affects Versions: 2.1.0
Reporter: huxihx


In NetworkClient.leastLoadedNode, it invokes `isReady` to  check if an 
established connection exists for the given node. `isReady` checks whether 
metadata needs to be updated also which wants to make metadata request first 
priority. However, if the to-be-sent request is metadata request, then we do 
not have to check this otherwise the loop in `leastLoadedNode` will do a 
complete iteration until the final node is selected. That's not performance 
efficient for a large cluster.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7765) IdleExpiryManager should not passively close socket used by controller

2018-12-21 Thread huxihx (JIRA)
huxihx created KAFKA-7765:
-

 Summary: IdleExpiryManager should not passively close socket used 
by controller
 Key: KAFKA-7765
 URL: https://issues.apache.org/jira/browse/KAFKA-7765
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.1.0
Reporter: huxihx


Currently, controller creates sockets for every living brokers without idle 
timeout. However, other brokers' processor threads still could close these 
sockets if no requests flow through them within `connections.max.idle.ms`.

Lots of CLOSE_WAITs were left when those sockets were closed by remote peer 
since controller's RequestSendThread will not check if they are closed by peer.

I think we need to figure out a way to record which channels should be 
maintained and have them excluded by IdleExpiryManager. A naive method is to 
augment KafkaChannel, making it have a field indicating whether this channel 
should be kept alive.

Does it make any sense?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7705) Update javadoc for default value of delivery.timeout.ms

2018-12-03 Thread huxihx (JIRA)
huxihx created KAFKA-7705:
-

 Summary: Update javadoc for default value of delivery.timeout.ms
 Key: KAFKA-7705
 URL: https://issues.apache.org/jira/browse/KAFKA-7705
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 2.1.0
Reporter: huxihx


In 
[https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]

the sample producer code fails to run due to the ConfigException thrown: 
delivery.timeout.ms should be equal to or larger than linger.ms + 
request.timeout.ms

The default value for delivery.timeout.ms or linger.ms should be updated 
accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7665) Replace BaseConsumerRecord with ConsumerRecord in MM

2018-11-20 Thread huxihx (JIRA)
huxihx created KAFKA-7665:
-

 Summary: Replace BaseConsumerRecord with ConsumerRecord in MM
 Key: KAFKA-7665
 URL: https://issues.apache.org/jira/browse/KAFKA-7665
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.1.0
Reporter: huxihx


Replace deprecated `BaseConsumerRecord` with ConsumerRecord in MirrorMaker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7354) Fix IdlePercent and NetworkProcessorAvgIdlePercent metric calculation

2018-08-28 Thread huxihx (JIRA)
huxihx created KAFKA-7354:
-

 Summary: Fix IdlePercent and NetworkProcessorAvgIdlePercent metric 
calculation
 Key: KAFKA-7354
 URL: https://issues.apache.org/jira/browse/KAFKA-7354
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
Reporter: huxihx
Assignee: huxihx


Currently, MBean 
`kafka.network:type=Processor,name=IdlePercent,networkProcessor=*` and 
`afka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent` could be 
greater than 1. However, these two values represent a percentage which should 
not exceed 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7279) partitionsFor implicitly creates topic for the existent topic

2018-08-12 Thread huxihx (JIRA)
huxihx created KAFKA-7279:
-

 Summary: partitionsFor implicitly creates topic for the existent 
topic
 Key: KAFKA-7279
 URL: https://issues.apache.org/jira/browse/KAFKA-7279
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: huxihx


With `auto.create.topics.enable` set to true, the non-existent topic got 
created when invoking `Consumer#partitionsFor`. Is it deliberately as designed?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-11 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reopened KAFKA-7141:
---
  Assignee: huxihx

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Assignee: huxihx
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6814) Bad exception message for GroupIdNotFoundException/GroupNotEmptyException

2018-04-22 Thread huxihx (JIRA)
huxihx created KAFKA-6814:
-

 Summary: Bad exception message for 
GroupIdNotFoundException/GroupNotEmptyException
 Key: KAFKA-6814
 URL: https://issues.apache.org/jira/browse/KAFKA-6814
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.1.0
Reporter: huxihx


Both `GroupNotEmptyException` and `GroupIdNotFoundException` constructors 
accept group id to deliver the exception message. However they can only be 
created with a fixed string "The group id does not exist" or "The group is not 
empty" which leads to a very unfriendly message when user failed to delete 
consumer groups, as below:
{code:java}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.GroupIdNotFoundException: The group id The group 
id does not exist was not found{code}
group id got failed to be shown in the thrown exception. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6731) waitOnState waits for the wrong state instead of the target one

2018-04-01 Thread huxihx (JIRA)
huxihx created KAFKA-6731:
-

 Summary: waitOnState waits for the wrong state instead of the 
target one
 Key: KAFKA-6731
 URL: https://issues.apache.org/jira/browse/KAFKA-6731
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: huxihx


In KafkaStreams.waitOnState, the code will waits the state to be set to 
NOT_RUNNING instead of the given target state. Is it deliberately as designed?
{code:java}
// ..
while (state != State.NOT_RUNNING) {
if (waitMs == 0) {
try {
stateLock.wait();
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
}
} else if (waitMs > elapsedMs) {
long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
}
} else {
log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
return false;
}
elapsedMs = time.milliseconds() - begin;
}
return true;
{code}
IMO, it should check the state to be the target one. 

[~guozhang] Does is make sense?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6663) Expression for GlobalKTable is not correct

2018-03-15 Thread huxihx (JIRA)
huxihx created KAFKA-6663:
-

 Summary: Expression for GlobalKTable is not correct
 Key: KAFKA-6663
 URL: https://issues.apache.org/jira/browse/KAFKA-6663
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: huxihx


In [this stream doc 
section|https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api#creating-source-streams-from-kafka],
  when reading records from Kafka to a global KTable, the doc says:
`In the case of a GlobalKTable, the local GlobalKTable instance of every 
application instance will be populated with data from only a *subset* of the 
partitions of the input topic. Collectively, across all application instances, 
all input topic partitions are read and processed.`

Is it correct? Each GlobalKTable instance only get assigned with a subset of 
the partitions of the input topic? I remember it should be able to consume all 
the partitions of the input topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2018-03-01 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-6592.
---
Resolution: Duplicate

Seems it's a duplicate of 
[KAFKA-4831|https://issues.apache.org/jira/browse/KAFKA-4831]

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2018-02-25 Thread huxihx (JIRA)
huxihx created KAFKA-6592:
-

 Summary: NullPointerException thrown when executing ConsoleCosumer 
with deserializer set to `WindowedDeserializer`
 Key: KAFKA-6592
 URL: https://issues.apache.org/jira/browse/KAFKA-6592
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 1.0.0
Reporter: huxihx


When reading streams app's output topic with WindowedDeserializer deserilizer 
using kafka-console-consumer.sh, NullPointerException was thrown due to the 
fact that the inner deserializer was not initialized since there is no place in 
ConsoleConsumer to set this class.

Complete stack trace is shown below:
{code:java}
[2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
(kafka.tools.ConsoleConsumer$)

java.lang.NullPointerException

at 
org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)

at 
org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)

at 
kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)

at scala.Option.map(Option.scala:146)

at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)

at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)

at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)

at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)

at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)

at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6550) UpdateMetadataRequest should be lazily created

2018-02-09 Thread huxihx (JIRA)
huxihx created KAFKA-6550:
-

 Summary: UpdateMetadataRequest should be lazily created
 Key: KAFKA-6550
 URL: https://issues.apache.org/jira/browse/KAFKA-6550
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 1.0.0
Reporter: huxihx
Assignee: huxihx


In ControllerBrokerRequestBatch.sendRequestsToBrokers, there is no need to 
eagerly construct the UpdateMetadataRequest.Builder since sometimes 
updateMetadataRequestBrokerSet is actually empty. In those cases, we should 
defer the construction to the time when we really need them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException

2018-02-01 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-6370.
---
Resolution: Cannot Reproduce

Although it's harmless to add some defensive checks, this issue should have not 
happened based on the code review. Since it is not easy to reproduce again, 
just closed this Jira and be free to reopen it if encountered.

> MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to 
> NullPointerException
> ---
>
> Key: KAFKA-6370
> URL: https://issues.apache.org/jira/browse/KAFKA-6370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>  Labels: mirror-maker
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/
>  :
> {code}
> java.lang.NullPointerException
>   at 
> scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:234)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at scala.collection.immutable.StringLike.format(StringLike.scala:351)
>   at scala.collection.immutable.StringLike.format$(StringLike.scala:350)
>   at scala.collection.immutable.StringOps.format(StringOps.scala:29)
>   at 
> kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170)
>   at scala.collection.immutable.List.map(List.scala:283)
>   at 
> kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67)
>   at 
> kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51)
>   at 
> kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47)
>   at 
> kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42)
>   at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81)
>   at 
> kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80)
>   at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352)
>   at kafka.network.RequestMetrics.(RequestChannel.scala:364)
>   at 
> kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56)
>   at kafka.network.RequestChannel.(RequestChannel.scala:243)
>   at kafka.network.SocketServer.(SocketServer.scala:71)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:238)
>   at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
>   at 
> kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:93)
> {code}
> Here is the code from KafkaMetricsGroup.scala :
> {code}
> .map { case (key, value) => "%s.%s".format(key, 
> value.replaceAll("\\.", "_"))}
> {code}
> It seems (some) value was null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6429) dirtyNonActiveSegments in `cleanableOffsets` should only be created when log.cleaner.min.compaction.lag.ms > 0

2018-01-06 Thread huxihx (JIRA)
huxihx created KAFKA-6429:
-

 Summary: dirtyNonActiveSegments in `cleanableOffsets` should only 
be created when log.cleaner.min.compaction.lag.ms > 0
 Key: KAFKA-6429
 URL: https://issues.apache.org/jira/browse/KAFKA-6429
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 1.0.0
Reporter: huxihx
Assignee: huxihx


LogCleanerManager.cleanableOffsets always created objects to hold all dirty 
non-active segments, as shown below:

{code:java}
// dirty log segments
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset)
{code}

Actually, these objects will not be used when 
`log.cleaner.min.compaction.lag.ms` is 0 which is already the default value. We 
could defer the creation. In doing so can we reduce the heap size but also 
avoid the blocking access to the segments incurred by Log.segments.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6425) Calculating cleanBytes in LogToClean might not be correct

2018-01-05 Thread huxihx (JIRA)
huxihx created KAFKA-6425:
-

 Summary: Calculating cleanBytes in LogToClean might not be correct
 Key: KAFKA-6425
 URL: https://issues.apache.org/jira/browse/KAFKA-6425
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.0
Reporter: huxihx


In class `LogToClean`, the calculation for `cleanBytes` is as below:
{code:java}
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
{code}

Most of the time, the `firstDirtyOffset` is the base offset of active segment 
which works pretty well with log.logSegments, so we can calculate the 
cleanBytes by safely summing up the sizes of all log segments whose base offset 
is less than `firstDirtyOffset`.

However, things changed after `firstUnstableOffset` was introduced. Users could 
indirectly change this offset to a non-base offset(changing log start offset 
for instance). In this case, it's not correct to sum up the total size for a 
log segment. Instead, we should only sum up the bytes between the base offset 
and `firstUnstableOffset`.

Let me show an example:
Say I have three log segments, shown as below:
0L   -->  log segment1, size: 1000Bytes
1234L -->  log segment2, size: 1000Bytes
4567L --> active log segment, current size: 500Bytes

Based on the current code, if `firstUnstableOffset` is deliberately set to 
2000L(this could be possible, since it's lower bounded by the log start offset 
and user could explicitly change LSO), then `cleanBytes` is calculated as 
2000Bytes which is wrong. The expected value should be 1000 + (bytes between 
offset 1234L and 2000L) 

[~junrao] [~ijuma] Do all of these make sense?






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4767) KafkaProducer is not joining its IO thread properly

2017-12-17 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-4767.
---
Resolution: Fixed

already fixed in another jira.

> KafkaProducer is not joining its IO thread properly
> ---
>
> Key: KAFKA-4767
> URL: https://issues.apache.org/jira/browse/KAFKA-4767
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Buğra Gedik
>Assignee: huxihx
>Priority: Minor
> Fix For: 1.1.0, 1.0.1, 0.11.0.3
>
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code 
> is like this:
> {code}
> try {
> this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, t);
> log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving 
> the io thread running. The correct way of handling this is a follows:
> {code}
> try {
> this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
> // propagate the interrupt
> this.ioThread.interrupt();
> try { 
>  this.ioThread.join();
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, t);
> log.error("Interrupted while joining ioThread", t);
> } finally {
> // make sure we maintain the interrupted status
> Thread.currentThread.interrupt();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6355) transient failure in org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies

2017-12-12 Thread huxihx (JIRA)
huxihx created KAFKA-6355:
-

 Summary: transient failure in 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies
 Key: KAFKA-6355
 URL: https://issues.apache.org/jira/browse/KAFKA-6355
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: huxihx


Got transient failure during running 
'org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies'

Error Message
java.lang.AssertionError: Condition not met within timeout 3. Did not 
receive all 20 records from topic singlePartitionOutputTopic
Stacktrace
java.lang.AssertionError: Condition not met within timeout 3. Did not 
receive all 20 records from topic singlePartitionOutputTopic
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:195)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:165)
at 
org.apache.kafka.streams.integration.EosIntegrationTest.runSimpleCopyTest(EosIntegrationTest.java:183)
at 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldBeAbleToRunWithTwoSubtopologies(EosIntegrationTest.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 

[jira] [Resolved] (KAFKA-6341) 'networkThreadTimeNanos' in KafkaChannel is not thread safe

2017-12-11 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-6341.
---
Resolution: Not A Bug

> 'networkThreadTimeNanos' in KafkaChannel is not thread safe
> ---
>
> Key: KAFKA-6341
> URL: https://issues.apache.org/jira/browse/KAFKA-6341
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.0.0
>Reporter: huxihx
>
> `networkThreadTimeNanos` in KafkaChannel is of primitive long type which is 
> not thread safe. Multiple Processor threads could access(read and write) this 
> variable at the same time. Since JVM spec does not guarantee of the atomic 
> 64-bit operations against long/double types, it's safer to employ AtomicLong 
> instead of the naive long type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6341) 'networkThreadTimeNanos' in KafkaChannel is not thread safe

2017-12-11 Thread huxihx (JIRA)
huxihx created KAFKA-6341:
-

 Summary: 'networkThreadTimeNanos' in KafkaChannel is not thread 
safe
 Key: KAFKA-6341
 URL: https://issues.apache.org/jira/browse/KAFKA-6341
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 1.0.0
Reporter: huxihx


`networkThreadTimeNanos` in KafkaChannel is of primitive long type which is not 
thread safe. Multiple Processor threads could access(read and write) this 
variable at the same time. Since JVM spec does not guarantee of the atomic 
64-bit operations against long/double types, it's safer to employ AtomicLong 
instead of the naive long type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6219) Inconsistent behavior for kafka-consumer-groups with ACL enabled

2017-11-15 Thread huxihx (JIRA)
huxihx created KAFKA-6219:
-

 Summary: Inconsistent behavior for kafka-consumer-groups with ACL 
enabled
 Key: KAFKA-6219
 URL: https://issues.apache.org/jira/browse/KAFKA-6219
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.0.0
Reporter: huxihx
Assignee: huxihx


When ACL is enabled, running kafka-consumer-groups.sh --describe to describe a 
group complains:

`Error: Executing consumer group command failed due to Not authorized to access 
group: Group authorization failed.`

However, running kafka-consumer-groups.sh --list otherwise returns nothing, 
confusing user whether there are no groups at all or something wrong happened.

In `AdminClient.listAllGroups`, it captures all the possible exceptions and 
returns an empty List.

It's better keep those two methods consistent. Does it make any sense?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5976) RequestChannel.sendReponse records incorrect size for NetworkSend with TRACE logging

2017-09-26 Thread huxihx (JIRA)
huxihx created KAFKA-5976:
-

 Summary: RequestChannel.sendReponse records incorrect size for 
NetworkSend with TRACE logging
 Key: KAFKA-5976
 URL: https://issues.apache.org/jira/browse/KAFKA-5976
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.1
Reporter: huxihx
Assignee: huxihx


In RequestChannel.scala, RequestChannel.sendResponse records incorrect size for 
`NetworkSend` when trace logging is enabled, as shown below:

{code:title=RequestChannel.scala|borderStyle=solid}
def sendResponse(response: RequestChannel.Response) {
if (isTraceEnabled) {
  val requestHeader = response.request.header
  trace(s"Sending ${requestHeader.apiKey} response to client 
${requestHeader.clientId} of " + s"${response.responseSend.size} bytes.")
}
{code}

`responseSend` is of `scala.Option` type so it should be 
`response.responseSend.get.size`. 

No need to check if they are none here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5753) ShellTest.testRunProgramWithErrorReturn fails locally

2017-08-20 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5753.
---
Resolution: Duplicate

> ShellTest.testRunProgramWithErrorReturn fails locally
> -
>
> Key: KAFKA-5753
> URL: https://issues.apache.org/jira/browse/KAFKA-5753
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>
> Seeing this locally (on Mac OS):
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.kafka.common.utils.ShellTest.testRunProgramWithErrorReturn(ShellTest.java:69)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The failing assertion is this:
> {code}
> assertTrue(e.getMessage().contains("No such file"));
> {code}
> The actual exception is this:
> {code}
> org.apache.kafka.common.utils.Shell$ExitCodeException: head: illegal byte 
> count -- 0
>   at org.apache.kafka.common.utils.Shell.runCommand(Shell.java:130)
>   at org.apache.kafka.common.utils.Shell.run(Shell.java:76)
>   at 
> org.apache.kafka.common.utils.Shell$ShellCommandExecutor.execute(Shell.java:204)
>   at org.apache.kafka.common.utils.Shell.execCommand(Shell.java:268)
>   at org.apache.kafka.common.utils.Shell.execCommand(Shell.java:255)
>   at 
> org.apache.kafka.common.utils.ShellTest.testRunProgramWithErrorReturn(ShellTest.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5744) ShellTest: add tests for attempting to run nonexistent program, error return

2017-08-20 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reopened KAFKA-5744:
---

[~cmccabe] On Mac OS, `head -c 0 ` fails with the error "illegal byte count 
-- 0" instead of "No such file" Could use `head -c 1`

> ShellTest: add tests for attempting to run nonexistent program, error return
> 
>
> Key: KAFKA-5744
> URL: https://issues.apache.org/jira/browse/KAFKA-5744
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 1.0.0
>
>
> ShellTest should have tests for attempting to run nonexistent program, and 
> running a program which returns an error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5715) ConsumerGroupCommand failed to show in ascending order for partitions without consumers

2017-08-08 Thread huxihx (JIRA)
huxihx created KAFKA-5715:
-

 Summary: ConsumerGroupCommand failed to show in ascending order 
for partitions without consumers 
 Key: KAFKA-5715
 URL: https://issues.apache.org/jira/browse/KAFKA-5715
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.11.0.0
Reporter: huxihx
Assignee: huxihx
Priority: Minor


For active consumer groups, ConsumerGroupCommand shows partitions in ascending 
order which is a usually expected behavior for users. But for inactive groups 
or partitions without consumer assigned, the tool prints them in a random 
order. The behavior should be same for both inactive and active groups.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5707) Remove useless `--force` option for both TopicCommand and ConfigCommand

2017-08-07 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5707.
---
Resolution: Not A Bug

For the sake of compatibility, just keep `--force` in both classes. Closed this 
jira then.

> Remove useless `--force` option for both TopicCommand and ConfigCommand
> ---
>
> Key: KAFKA-5707
> URL: https://issues.apache.org/jira/browse/KAFKA-5707
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> `TopicCommand` and `ConfigCommand` do expose an option named `--force` which 
> suppresses console prompts, but both classes do not actually use it. Should 
> remove it from the usage description.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5700) producer missed header information when splitting batches

2017-08-06 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5700.
---
Resolution: Fixed

> producer missed header information when splitting batches
> -
>
> Key: KAFKA-5700
> URL: https://issues.apache.org/jira/browse/KAFKA-5700
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> In `ProducerBatch.tryAppendForSplit`, invoking 
> `this.recordsBuilder.append(timestamp, key, value);` missed the header 
> information in the ProducerRecord. Should invoke this like :
> `this.recordsBuilder.append(timestamp, key, value, headers);`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5707) Remove useless `--force` option for both TopicCommand and ConfigCommand

2017-08-06 Thread huxihx (JIRA)
huxihx created KAFKA-5707:
-

 Summary: Remove useless `--force` option for both TopicCommand and 
ConfigCommand
 Key: KAFKA-5707
 URL: https://issues.apache.org/jira/browse/KAFKA-5707
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.11.0.0
Reporter: huxihx
Assignee: huxihx
Priority: Minor


`TopicCommand` and `ConfigCommand` do expose an option named `--force` which 
suppresses console prompts, but both classes do not actually use it. Should 
remove it from the usage description.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread

2017-08-03 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5665.
---
Resolution: Not A Bug

> Incorrect interruption invoking method used for Heartbeat thread 
> -
>
> Key: KAFKA-5665
> URL: https://issues.apache.org/jira/browse/KAFKA-5665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When interrupting the background heartbeat thread, `Thread.interrupted();` is 
> used. Actually, `Thread.currentThread().interrupt();` should be used to 
> restore the interruption status. An alternative way to solve is to remove 
> `Thread.interrupted();` since HeartbeatThread extends Thread and all code 
> higher up on the call stack is controlled, so we could safely swallow this 
> exception. Anyway, `Thread.interrupted();`  should not be used here. It's a 
> test method not an action.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5700) producer missed header information when splitting batches

2017-08-03 Thread huxihx (JIRA)
huxihx created KAFKA-5700:
-

 Summary: producer missed header information when splitting batches
 Key: KAFKA-5700
 URL: https://issues.apache.org/jira/browse/KAFKA-5700
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.11.0.0
Reporter: huxihx
Assignee: huxihx


In `ProducerBatch.tryAppendForSplit`, invoking 
`this.recordsBuilder.append(timestamp, key, value);` missed the header 
information in the ProducerRecord. Should invoke this like :

`this.recordsBuilder.append(timestamp, key, value, headers);`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-08-02 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5641.
---
Resolution: Not A Problem

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread

2017-07-26 Thread huxihx (JIRA)
huxihx created KAFKA-5665:
-

 Summary: Incorrect interruption invoking method used for Heartbeat 
thread 
 Key: KAFKA-5665
 URL: https://issues.apache.org/jira/browse/KAFKA-5665
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.11.0.0
Reporter: huxihx
Assignee: huxihx
Priority: Minor


When interrupting the background heartbeat thread, `Thread.interrupted();` is 
used. Actually, `Thread.currentThread().interrupt();` should be used to restore 
the interruption status. An alternative way to solve is to remove 
`Thread.interrupted();` since HeartbeatThread extends Thread and all code 
higher up on the call stack is controlled, so we could safely swallow this 
exception. Anyway, `Thread.interrupted();`  should not be used here. It's a 
test method not an action.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5641) Metadata request should be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-25 Thread huxihx (JIRA)
huxihx created KAFKA-5641:
-

 Summary: Metadata request should be allowed to send no regardless 
of value for max.in.flight.requests.per.connection
 Key: KAFKA-5641
 URL: https://issues.apache.org/jira/browse/KAFKA-5641
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.11.0.0
Reporter: huxihx


Metadata request might not be able to be sent when 
`max.in.flight.requests.per.connection` is set to 1 and there is already an 
inflight request in the same node's queue, as show below:
{code:title=NetworkClient.java|borderStyle=solid}
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
..
}
{code}

However, setting `max.in.flight.requests.per.connection` to 1 actually means no 
out-of-order for the produced records, Metadata requests should have no related 
with this config. We don't have to check the inflight request's queue size when 
sending Metadata request.

[~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5582) Log compaction with preallocation enabled does not trim segments

2017-07-20 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-5582.
---
Resolution: Duplicate

> Log compaction with preallocation enabled does not trim segments
> 
>
> Key: KAFKA-5582
> URL: https://issues.apache.org/jira/browse/KAFKA-5582
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
> Environment: Linux, Windows
>Reporter: Jason Aliyetti
>
> Unexpected behavior occurs when a topic is configured to preallocate files 
> and has a retention policy of compact.
> When log compaction runs, the cleaner attempts to gather groups of segments 
> to consolidate based on the max segment size.  
> When preallocation is enabled all segments are that size and thus each 
> individual segment is considered for compaction.
> When compaction does occur, the resulting cleaned file is sized based on that 
> same configuration.  This means that you can have very large files on disk 
> that contain little or no data which partly defeats the point of compacting. 
> The log cleaner should trim these segments to free up disk space.  That way 
> they would free up disk space and be able to be further compacted on 
> subsequent runs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5560) LogManager should be able to create new logs based on free disk space

2017-07-06 Thread huxihx (JIRA)
huxihx created KAFKA-5560:
-

 Summary: LogManager should be able to create new logs based on 
free disk space
 Key: KAFKA-5560
 URL: https://issues.apache.org/jira/browse/KAFKA-5560
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.11.0.0
Reporter: huxihx


Currently, log manager chooses a directory configured in `log.dirs` by 
calculating the number partitions in each directory and then choosing the one 
with the fewest partitions. But in some real production scenarios where data 
volumes of partitions are not even, some disks nearly become full whereas the 
others have a lot of spaces which lead to a poor data distribution.

We should offer a new strategy to users to have log manager honor the real disk 
free spaces and choose the directory with the most disk space. Maybe a new 
broker configuration parameter is needed, `log.directory.strategy` for 
instance. Perhaps this needs a new KIP also.

Does it make sense?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5444) Producer.send() will hang 8+ hours

2017-06-14 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049787#comment-16049787
 ] 

huxihx commented on KAFKA-5444:
---

Is it possible to try using the new producer? And why did you create producer 
then close it so frequently?

> Producer.send() will hang 8+ hours
> --
>
> Key: KAFKA-5444
> URL: https://issues.apache.org/jira/browse/KAFKA-5444
> Project: Kafka
>  Issue Type: Bug
>Reporter: Hongyuan Li
>
> Frequent kafka old Producer open and close with cause the server hang with 
> lots of error messages logged in the server.log .In my occasion,we may 
> frequent open and close kafka producer,the procedure just like the code below:
> {code}
> Producer producer = ……
> producer.send(List lists);
> producer.close();
> {code}
> the error is below:
> {code}
> 2017-06-13 00:00:00,084] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
>   at java.nio.Bits.reserveMemory(Bits.java:658)
>   at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>   at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>   at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>   at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>   at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
>   at kafka.network.Processor.poll(SocketServer.scala:476)
>   at kafka.network.Processor.run(SocketServer.scala:416)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> seen from all existing logs, all error is repeats of error above.
> Any good idea to solve this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5444) Producer.send() will hang 8+ hours

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048674#comment-16048674
 ] 

huxihx commented on KAFKA-5444:
---

Is it a duplicate of 
[KAFKA-3552|https://issues.apache.org/jira/browse/KAFKA-3552]?  What's your 
Kafka version and JVM version?  And since `reserveMemory` triggers a System.gc 
if no extra space is found for allocating direct byte buffer, check your env to 
see if System.gc is explicitly disabled.

> Producer.send() will hang 8+ hours
> --
>
> Key: KAFKA-5444
> URL: https://issues.apache.org/jira/browse/KAFKA-5444
> Project: Kafka
>  Issue Type: Bug
>Reporter: Hongyuan Li
>
> Frequent kafka old Producer open and close with cause the server hang with 
> lots of error messages logged in the server.log .In my occasion,we may 
> frequent open and close kafka producer,the procedure just like the code below:
> {code}
> Producer producer = ……
> producer.send(List lists);
> producer.close();
> {code}
> the error is below:
> {code}
> 2017-06-13 00:00:00,084] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
>   at java.nio.Bits.reserveMemory(Bits.java:658)
>   at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>   at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>   at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>   at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>   at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
>   at kafka.network.Processor.poll(SocketServer.scala:476)
>   at kafka.network.Processor.run(SocketServer.scala:416)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> the detailed error logs lists will be added in the attachment.
> Any good idea to solve this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5432) producer and consumer SocketTimeoutException

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048615#comment-16048615
 ] 

huxihx commented on KAFKA-5432:
---

I might not see rolling new log segment as an indicator of the cause.  Is that 
possible that those CLOSE_WAITs lead to the SocketTimeoutException thrown both 
by consumers and producers?

> producer and consumer  SocketTimeoutException
> -
>
> Key: KAFKA-5432
> URL: https://issues.apache.org/jira/browse/KAFKA-5432
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.10.2.0
> Environment: os:Red Hat 4.4.7-17
> java:
> java version "1.8.0_131"
> Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
> Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Jian Lin
> Attachments: server.properties
>
>
> Hey all, I met a strange problem, hope someone can help me.
> The program ran normally for a week, and I did not do any changes, but today 
> it reported a mistake suddenly
> Producer error log:
> {code:java}
> 2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer 
> request with correlation id 234645 to broker 176 with data for partitions 
> [sms,3]
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>   at kafka.utils.Utils$.read(Utils.scala:380)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>   at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:77)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> {code}
> Consumer error log:
> {code:java}
> 2017-06-12 
> 10:46:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602874523]-[INFO]
>  Reconnect due to socket error: java.net.SocketTimeoutException
> 2017-06-12 
> 10:47:22[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602904544]-[WARN]
>  
> [ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176],
>  Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 6060231; 
> ClientId: sms-consumer-group1; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 
> bytes; RequestInfo: [sms,0] -> 

[jira] [Commented] (KAFKA-5007) Kafka Replica Fetcher Thread- Resource Leak

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048605#comment-16048605
 ] 

huxihx commented on KAFKA-5007:
---

[~joseph.alias...@gmail.com] org.apache.kafka.common.network.Selector

> Kafka Replica Fetcher Thread- Resource Leak
> ---
>
> Key: KAFKA-5007
> URL: https://issues.apache.org/jira/browse/KAFKA-5007
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.10.2.0
> Environment: Centos 7
> Jave 8
>Reporter: Joseph Aliase
>Priority: Critical
>  Labels: reliability
> Attachments: jstack-kafka.out, jstack-zoo.out, lsofkafka.txt, 
> lsofzookeeper.txt
>
>
> Kafka is running out of open file descriptor when system network interface is 
> done.
> Issue description:
> We have a Kafka Cluster of 5 node running on version 0.10.1.1. The open file 
> descriptor for the account running Kafka is set to 10.
> During an upgrade, network interface went down. Outage continued for 12 hours 
> eventually all the broker crashed with java.io.IOException: Too many open 
> files error.
> We repeated the test in a lower environment and observed that Open Socket 
> count keeps on increasing while the NIC is down.
> We have around 13 topics with max partition size of 120 and number of replica 
> fetcher thread is set to 8.
> Using an internal monitoring tool we observed that Open Socket descriptor   
> for the broker pid continued to increase although NIC was down leading to  
> Open File descriptor error. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5431) LogCleaner stopped due to org.apache.kafka.common.errors.CorruptRecordException

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047637#comment-16047637
 ] 

huxihx edited comment on KAFKA-5431 at 6/13/17 9:25 AM:


Could you run commands below to see whether there exists a corrupt record for 
`__consumer_offsets` topic?


{noformat}
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 18 
--broker-list *** --formatter 
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 24 
--broker-list *** --formatter 
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
{noformat}



was (Author: huxi_2b):
Could you run command below to see whether there exists a corrupt record for 
`__consumer_offsets` topic?


{noformat}
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 18 
--broker-list *** --formatter 
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 24 
--broker-list *** --formatter 
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
{noformat}


> LogCleaner stopped due to 
> org.apache.kafka.common.errors.CorruptRecordException
> ---
>
> Key: KAFKA-5431
> URL: https://issues.apache.org/jira/browse/KAFKA-5431
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Carsten Rietz
>  Labels: reliability
>
> Hey all,
> i have a strange problem with our uat cluster of 3 kafka brokers.
> the __consumer_offsets topic was replicated to two instances and our disks 
> ran full due to a wrong configuration of the log cleaner. We fixed the 
> configuration and updated from 0.10.1.1 to 0.10.2.1 .
> Today i increased the replication of the __consumer_offsets topic to 3 and 
> triggered replication to the third cluster via kafka-reassign-partitions.sh. 
> That went well but i get many errors like
> {code}
> [2017-06-12 09:59:50,342] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,18] offset 0 error Record size is less than the 
> minimum record overhead (14) (kafka.server.ReplicaFetcherThread)
> [2017-06-12 09:59:50,342] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,24] offset 0 error Record size is less than the 
> minimum record overhead (14) (kafka.server.ReplicaFetcherThread)
> {code}
> Which i think are due to the full disk event.
> The log cleaner threads died on these wrong messages:
> {code}
> [2017-06-12 09:59:50,722] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> org.apache.kafka.common.errors.CorruptRecordException: Record size is less 
> than the minimum record overhead (14)
> [2017-06-12 09:59:50,722] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> {code}
> Looking at the file is see that some are truncated and some are jsut empty:
> $ ls -lsh 00594653.log
> 0 -rw-r--r-- 1 user user 100M Jun 12 11:00 00594653.log
> Sadly i do not have the logs any more from the disk full event itsself.
> I have three questions:
> * What is the best way to clean this up? Deleting the old log files and 
> restarting the brokers?
> * Why did kafka not handle the disk full event well? Is this only affecting 
> the cleanup or may we also loose data?
> * Is this maybe caused by the combination of upgrade and disk full?
> And last but not least: Keep up the good work. Kafka is really performing 
> well while being easy to administer and has good documentation!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5431) LogCleaner stopped due to org.apache.kafka.common.errors.CorruptRecordException

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047637#comment-16047637
 ] 

huxihx commented on KAFKA-5431:
---

Could you run command below to see whether there exists a corrupt record for 
`__consumer_offsets` topic?


{noformat}
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 18 
--broker-list *** --formatter 
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 24 
--broker-list *** --formatter 
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
{noformat}


> LogCleaner stopped due to 
> org.apache.kafka.common.errors.CorruptRecordException
> ---
>
> Key: KAFKA-5431
> URL: https://issues.apache.org/jira/browse/KAFKA-5431
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Carsten Rietz
>  Labels: reliability
>
> Hey all,
> i have a strange problem with our uat cluster of 3 kafka brokers.
> the __consumer_offsets topic was replicated to two instances and our disks 
> ran full due to a wrong configuration of the log cleaner. We fixed the 
> configuration and updated from 0.10.1.1 to 0.10.2.1 .
> Today i increased the replication of the __consumer_offsets topic to 3 and 
> triggered replication to the third cluster via kafka-reassign-partitions.sh. 
> That went well but i get many errors like
> {code}
> [2017-06-12 09:59:50,342] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,18] offset 0 error Record size is less than the 
> minimum record overhead (14) (kafka.server.ReplicaFetcherThread)
> [2017-06-12 09:59:50,342] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,24] offset 0 error Record size is less than the 
> minimum record overhead (14) (kafka.server.ReplicaFetcherThread)
> {code}
> Which i think are due to the full disk event.
> The log cleaner threads died on these wrong messages:
> {code}
> [2017-06-12 09:59:50,722] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> org.apache.kafka.common.errors.CorruptRecordException: Record size is less 
> than the minimum record overhead (14)
> [2017-06-12 09:59:50,722] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> {code}
> Looking at the file is see that some are truncated and some are jsut empty:
> $ ls -lsh 00594653.log
> 0 -rw-r--r-- 1 user user 100M Jun 12 11:00 00594653.log
> Sadly i do not have the logs any more from the disk full event itsself.
> I have three questions:
> * What is the best way to clean this up? Deleting the old log files and 
> restarting the brokers?
> * Why did kafka not handle the disk full event well? Is this only affecting 
> the cleanup or may we also loose data?
> * Is this maybe caused by the combination of upgrade and disk full?
> And last but not least: Keep up the good work. Kafka is really performing 
> well while being easy to administer and has good documentation!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5432) producer and consumer SocketTimeoutException

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047569#comment-16047569
 ] 

huxihx commented on KAFKA-5432:
---

Seems it is a duplicate of 
[KAFKA-5007|https://issues.apache.org/jira/browse/KAFKA-5007].
 [~junrao] What do you say?

> producer and consumer  SocketTimeoutException
> -
>
> Key: KAFKA-5432
> URL: https://issues.apache.org/jira/browse/KAFKA-5432
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.10.2.0
> Environment: os:Red Hat 4.4.7-17
>Reporter: Jian Lin
> Attachments: server.properties
>
>
> Hey all, I met a strange problem, hope someone can help me.
> The program ran normally for a week, and I did not do any changes, but today 
> it reported a mistake suddenly
> Producer error log:
> {code:java}
> 2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer 
> request with correlation id 234645 to broker 176 with data for partitions 
> [sms,3]
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>   at kafka.utils.Utils$.read(Utils.scala:380)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>   at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:77)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> {code}
> Consumer error log:
> {code:java}
> 2017-06-12 
> 10:46:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602874523]-[INFO]
>  Reconnect due to socket error: java.net.SocketTimeoutException
> 2017-06-12 
> 10:47:22[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602904544]-[WARN]
>  
> [ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176],
>  Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 6060231; 
> ClientId: sms-consumer-group1; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 
> bytes; RequestInfo: [sms,0] -> PartitionFetchInfo(81132,1048576),[sms,3] -> 
> PartitionFetchInfo(94040,1048576). Possible cause: 
> java.net.SocketTimeoutException
> 2017-06-12 
> 

[jira] [Updated] (KAFKA-5432) producer and consumer SocketTimeoutException

2017-06-13 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx updated KAFKA-5432:
--
Component/s: (was: KafkaConnect)

> producer and consumer  SocketTimeoutException
> -
>
> Key: KAFKA-5432
> URL: https://issues.apache.org/jira/browse/KAFKA-5432
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.10.2.0
> Environment: os:Red Hat 4.4.7-17
>Reporter: Jian Lin
> Attachments: server.properties
>
>
> Hey all, I met a strange problem, hope someone can help me.
> The program ran normally for a week, and I did not do any changes, but today 
> it reported a mistake suddenly
> Producer error log:
> {code:java}
> 2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer 
> request with correlation id 234645 to broker 176 with data for partitions 
> [sms,3]
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>   at kafka.utils.Utils$.read(Utils.scala:380)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>   at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:77)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> {code}
> Consumer error log:
> {code:java}
> 2017-06-12 
> 10:46:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602874523]-[INFO]
>  Reconnect due to socket error: java.net.SocketTimeoutException
> 2017-06-12 
> 10:47:22[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602904544]-[WARN]
>  
> [ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176],
>  Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 6060231; 
> ClientId: sms-consumer-group1; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 
> bytes; RequestInfo: [sms,0] -> PartitionFetchInfo(81132,1048576),[sms,3] -> 
> PartitionFetchInfo(94040,1048576). Possible cause: 
> java.net.SocketTimeoutException
> 2017-06-12 
> 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904576]-[INFO]
>  Verifying properties
> 2017-06-12 
> 

[jira] [Commented] (KAFKA-5007) Kafka Replica Fetcher Thread- Resource Leak

2017-06-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047561#comment-16047561
 ] 

huxihx commented on KAFKA-5007:
---

[~junrao] is it possible that it 's caused by the code snippet below:


{code:java}
...
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
{code}

The code did not capture all possible exceptions so `socketChannel` got failed 
to be closed.


> Kafka Replica Fetcher Thread- Resource Leak
> ---
>
> Key: KAFKA-5007
> URL: https://issues.apache.org/jira/browse/KAFKA-5007
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.10.2.0
> Environment: Centos 7
> Jave 8
>Reporter: Joseph Aliase
>Priority: Critical
>  Labels: reliability
> Attachments: jstack-kafka.out, jstack-zoo.out, lsofkafka.txt, 
> lsofzookeeper.txt
>
>
> Kafka is running out of open file descriptor when system network interface is 
> done.
> Issue description:
> We have a Kafka Cluster of 5 node running on version 0.10.1.1. The open file 
> descriptor for the account running Kafka is set to 10.
> During an upgrade, network interface went down. Outage continued for 12 hours 
> eventually all the broker crashed with java.io.IOException: Too many open 
> files error.
> We repeated the test in a lower environment and observed that Open Socket 
> count keeps on increasing while the NIC is down.
> We have around 13 topics with max partition size of 120 and number of replica 
> fetcher thread is set to 8.
> Using an internal monitoring tool we observed that Open Socket descriptor   
> for the broker pid continued to increase although NIC was down leading to  
> Open File descriptor error. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5432) producer and consumer SocketTimeoutException

2017-06-12 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16047390#comment-16047390
 ] 

huxihx commented on KAFKA-5432:
---

Are there any changes before observing these exceptions? Say were any new 
consumers or producers added?

> producer and consumer  SocketTimeoutException
> -
>
> Key: KAFKA-5432
> URL: https://issues.apache.org/jira/browse/KAFKA-5432
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, network
>Affects Versions: 0.10.2.0
> Environment: os:Red Hat 4.4.7-17
>Reporter: Jian Lin
> Attachments: server.properties
>
>
> Hey all, I met a strange problem, hope someone can help me.
> The program ran normally for a week, and I did not do any changes, but today 
> it reported a mistake suddenly
> Producer error log:
> ```
> 2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer 
> request with correlation id 234645 to broker 176 with data for partitions 
> [sms,3]
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>   at kafka.utils.Utils$.read(Utils.scala:380)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at 
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>   at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>   at 
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>   at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>   at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>   at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>   at kafka.producer.Producer.send(Producer.scala:77)
>   at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> ```
> Consumer error log:
> ```
> 2017-06-12 
> 10:52:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:603234738]-[WARN]
>  Fetching topic metadata with correlation id 7 for topics [Set(sms)] from 
> broker [id:176,host:10.17.24.176,port:9092] failed
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>   at kafka.utils.Utils$.read(Utils.scala:380)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> 

[jira] [Commented] (KAFKA-5418) ZkUtils.getAllPartitions() may fail if a topic is marked for deletion

2017-06-09 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16045321#comment-16045321
 ] 

huxihx commented on KAFKA-5418:
---

Is it a duplicate of 
[KAFKA-1019|https://issues.apache.org/jira/browse/KAFKA-1019]?

> ZkUtils.getAllPartitions() may fail if a topic is marked for deletion
> -
>
> Key: KAFKA-5418
> URL: https://issues.apache.org/jira/browse/KAFKA-5418
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.2.1
>Reporter: Edoardo Comar
>
> Running {{ZkUtils.getAllPartitions()}} on a cluster which had a topic stuck 
> in the 'marked for deletion' state 
> so it was a child of {{/brokers/topics}}
> but it had no children, i.e. the path {{/brokers/topics/thistopic/partitions}}
> did not exist, throws a ZkNoNodeException while iterating:
> {noformat}
> rg.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/topics/xyzblahfoo/partitions
>   at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:995)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:671)
>   at kafka.utils.ZkUtils.getChildren(ZkUtils.scala:537)
>   at 
> kafka.utils.ZkUtils$$anonfun$getAllPartitions$1.apply(ZkUtils.scala:817)
>   at 
> kafka.utils.ZkUtils$$anonfun$getAllPartitions$1.apply(ZkUtils.scala:816)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at kafka.utils.ZkUtils.getAllPartitions(ZkUtils.scala:816)
> ...
>   at java.lang.Thread.run(Thread.java:809)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException: 
> KeeperErrorCode = NoNode for /brokers/topics/xyzblahfoo/partitions
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>   at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1472)
>   at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1500)
>   at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:114)
>   at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:678)
>   at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:985)
>   ... 
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2526) Console Producer / Consumer's serde config is not working

2017-06-09 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16045296#comment-16045296
 ] 

huxihx commented on KAFKA-2526:
---

[~guozhang] No, I am thinking [~mgharat] is working on this.

> Console Producer / Consumer's serde config is not working
> -
>
> Key: KAFKA-2526
> URL: https://issues.apache.org/jira/browse/KAFKA-2526
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>  Labels: newbie
>
> Although in the console producer one can specify the key value serializer, 
> they are actually not used since 1) it always serialize the input string as 
> String.getBytes (hence always pre-assume the string serializer) and 2) it is 
> actually only passed into the old producer. The same issues exist in console 
> consumer.
> In addition the configs in the console producer is messy: we have 1) some 
> config values exposed as cmd parameters, and 2) some config values in 
> --producer-property and 3) some in --property.
> It will be great to clean the configs up in both console producer and 
> consumer, and put them into a single --property parameter which could 
> possibly take a file to reading in property values as well, and only leave 
> --new-producer as the other command line parameter.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5406) NoNodeException result in rebalance failed

2017-06-07 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16042077#comment-16042077
 ] 

huxihx commented on KAFKA-5406:
---

Maybe could estimate the total time period for the network recovery and make 
sure `rebalance.max.retries` * `rebalance.backoff.ms` is no less than the 
period. Perhaps some application-level logic is required to handle a long 
network outage.

> NoNodeException result in rebalance failed
> --
>
> Key: KAFKA-5406
> URL: https://issues.apache.org/jira/browse/KAFKA-5406
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.2, 0.10.0.0
> Environment: windows8.1 centos6.4
>Reporter: xiaoguy
>Priority: Critical
>  Labels: easyfix, patch
> Attachments: log.log
>
>
> hey guys , I got this problem this days,
> because of the network is unstable, consumer rebalance failed after 5 times 
> ,the log shows that zk path /consumers/$(groupIdName)/ids/ is empty,
> consumer seems can't register after network recovered, so i got the kafka 
> source code (0.8.2.2) and found the 
> consumer/ZookeeperConsumerConnector$ZKSessionExpireListener handleNewSession 
> won't call , and handleStateChanged do nothing,
> so i change the code like this ,and it seems works ,  and i checked 0.10.0.0 
> version, the same problem, is this a bug ? i'm confused , thank you.
>   def handleStateChanged(state: KeeperState) {
>// do nothing, since zkclient will do reconnect for us.
>  if(state==KeeperState.SyncConnected){
>   handleNewSession()
>  }
>   System.err.println("ZKSessionExpireListener 
> handleStateChanged-state:"+state+""+state.getIntValue)
>   }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5402) JmxReporter Fetch metrics for kafka.server should not be created when client quotas are not enabled

2017-06-07 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16042069#comment-16042069
 ] 

huxihx commented on KAFKA-5402:
---

Is it a duplicate of 
[KAFKA-3980|https://issues.apache.org/jira/browse/KAFKA-3980]?

> JmxReporter Fetch metrics for kafka.server should not be created when client 
> quotas are not enabled
> ---
>
> Key: KAFKA-5402
> URL: https://issues.apache.org/jira/browse/KAFKA-5402
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1
>Reporter: Koelli Mungee
> Attachments: Fetch.jpg, Metrics.jpg
>
>
> JMXReporter kafka.server Fetch metrics should not be created when client 
> quotas are not enforced for client fetch requests. Currently, these metrics 
> are created and this can cause OutOfMemoryException in the KafkaServer in 
> cases where a large number of consumers are being created rapidly.
> Attaching screenshots from a heapdump showing the 
> kafka.server:type=Fetch,client-id=consumer-358567 with different client.ids 
> from a kafkaserver where client quotas were not enabled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5405) Request log should log throttle time

2017-06-07 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-5405:
-

Assignee: huxihx

> Request log should log throttle time
> 
>
> Key: KAFKA-5405
> URL: https://issues.apache.org/jira/browse/KAFKA-5405
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.11.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>  Labels: newbie
>
> In RequestChannel, when logging the request and the latency, it would be 
> useful to include the apiThrottleTime as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char

2017-06-03 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-5098:
-

Assignee: huxihx

> KafkaProducer.send() blocks and generates TimeoutException if topic name has 
> illegal char
> -
>
> Key: KAFKA-5098
> URL: https://issues.apache.org/jira/browse/KAFKA-5098
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
> Environment: Java client running against server using 
> wurstmeister/kafka Docker image.
>Reporter: Jeff Larsen
>Assignee: huxihx
>
> The server is running with auto create enabled. If we try to publish to a 
> topic with a forward slash in the name, the call blocks and we get a 
> TimeoutException in the Callback. I would expect it to return immediately 
> with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related 
> to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "kafka.example.com:9092");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("max.block.ms", 1); // 10 seconds should illustrate our 
> point
> String separator = "/";
> //String separator = "_";
> try (Producer producer = new KafkaProducer<>(props)) {
>   System.out.println("Calling KafkaProducer.send() at " + new Date());
>   producer.send(
>   new ProducerRecord("abc" + separator + 
> "someStreamName",
>   "Not expecting a TimeoutException here"),
>   new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception e) {
>   if (e != null) {
> System.out.println(e.toString());
>   }
> }
>   });
>   System.out.println("KafkaProducer.send() completed at " + new Date());
> }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the 
> TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5262) Can't find some consumer group information

2017-06-03 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035865#comment-16035865
 ] 

huxihx commented on KAFKA-5262:
---

Does your client code commit offsets? Besides, a complete output for 
kafka-consumer-groups script is required to help diagnose the issue.

> Can't  find  some  consumer group   information
> ---
>
> Key: KAFKA-5262
> URL: https://issues.apache.org/jira/browse/KAFKA-5262
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 0.10.1.0
>Reporter: miaozhiyong
>
> The  kafka client use  broker to connect with kafka ,  i had install  two 
> kafka-manager.  the consumer don't display in the kafka-manager .and   can''t 
>  work with   the commmand line:
> kafka-consumer-groups.sh --new-consumer  --bootstrap-serveer
> but the client is ok . where is consumer store the lag?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5296) Unable to write to some partitions of newly created topic in 10.2

2017-06-03 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035862#comment-16035862
 ] 

huxihx commented on KAFKA-5296:
---

[~asaikia] Do you mean directories for some partitions could always fail to be 
created after issuing create-topic command?  That seems to be a separate issue 
although I am unsure if it relates to your original one. Could you check broker 
log for any other exceptions showing why file directories got failed to create?

> Unable to write to some partitions of newly created topic in 10.2
> -
>
> Key: KAFKA-5296
> URL: https://issues.apache.org/jira/browse/KAFKA-5296
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Abhisek Saikia
>
> We are using kafka 10.2 and the cluster was running fine for a month with 50 
> topics and now we are having issue in producing message by creating new 
> topics. The create topic command is successful but producers are throwing 
> error while writing to some partitions. 
> Error in producer-
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> [topic1]-8: 30039 ms has passed since batch creation plus linger time
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
>  ~[kafka-clients-0.10.2.0.jar:na]
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
>  ~[kafka-clients-0.10.2.0.jar:na]
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>  ~[kafka-clients-0.10.2.0.jar:na]
> On the broker side, I don't see any topic-parition folder getting created for 
> the broker who is the leader for the partition. 
> While using 0.8 client, the write used to hang while it starts writing to the 
> partition having this issue. With 10.2 it resolved the the producer hang issue
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)