[jira] [Updated] (KAFKA-4860) Kafka batch files does not support path with spaces

2017-03-06 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimír Kleštinec updated KAFKA-4860:
--
Status: Patch Available  (was: Open)

> Kafka batch files does not support path with spaces
> ---
>
> Key: KAFKA-4860
> URL: https://issues.apache.org/jira/browse/KAFKA-4860
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
> Environment: windows
>Reporter: Vladimír Kleštinec
>Priority: Minor
>
> When we install kafka on windows to path that contains spaces e.g. C:\Program 
> Files\ApacheKafkabatch files located in bin/windows don't work.
> Workaround: install on path without spaces



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4860) Kafka batch files does not support path with spaces

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898894#comment-15898894
 ] 

ASF GitHub Bot commented on KAFKA-4860:
---

GitHub user klesta490 opened a pull request:

https://github.com/apache/kafka/pull/2649

KAFKA-4860: Allow spaces in paths on windows

When we install kafka on path with spaces, 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/klesta490/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2649


commit 90f2d2f4bc5fd1a16838fb07e59369e7e32eabfa
Author: Vladimír Kleštinec 
Date:   2017-03-07T07:34:01Z

allow spaces in paths on windows




> Kafka batch files does not support path with spaces
> ---
>
> Key: KAFKA-4860
> URL: https://issues.apache.org/jira/browse/KAFKA-4860
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
> Environment: windows
>Reporter: Vladimír Kleštinec
>Priority: Minor
>
> When we install kafka on windows to path that contains spaces e.g. C:\Program 
> Files\ApacheKafkabatch files located in bin/windows don't work.
> Workaround: install on path without spaces



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2649: KAFKA-4860: Allow spaces in paths on windows

2017-03-06 Thread klesta490
GitHub user klesta490 opened a pull request:

https://github.com/apache/kafka/pull/2649

KAFKA-4860: Allow spaces in paths on windows

When we install kafka on path with spaces, 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/klesta490/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2649


commit 90f2d2f4bc5fd1a16838fb07e59369e7e32eabfa
Author: Vladimír Kleštinec 
Date:   2017-03-07T07:34:01Z

allow spaces in paths on windows




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4860) Kafka batch files does not support path with spaces

2017-03-06 Thread JIRA
Vladimír Kleštinec created KAFKA-4860:
-

 Summary: Kafka batch files does not support path with spaces
 Key: KAFKA-4860
 URL: https://issues.apache.org/jira/browse/KAFKA-4860
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0
 Environment: windows
Reporter: Vladimír Kleštinec
Priority: Minor


When we install kafka on windows to path that contains spaces e.g. C:\Program 
Files\ApacheKafkabatch files located in bin/windows don't work.


Workaround: install on path without spaces



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-06 Thread Armin Braun (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4859 started by Armin Braun.
--
> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-06 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898873#comment-15898873
 ] 

Armin Braun commented on KAFKA-4859:


Looking into this one, can easily reproduce it at least.

> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-06 Thread Armin Braun (JIRA)
Armin Braun created KAFKA-4859:
--

 Summary: Transient test failure: 
org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
 (again)
 Key: KAFKA-4859
 URL: https://issues.apache.org/jira/browse/KAFKA-4859
 Project: Kafka
  Issue Type: Sub-task
Reporter: Armin Braun
Assignee: Armin Braun


Slightly different than KAFKA-3874 in terms of the way it fails.

Now we have:

{code}
Error Message

java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
records from topic output-topic-2 while only received 0: []
Stacktrace

java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
records from topic output-topic-2 while only received 0: []
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
at 
org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
{code}

e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

2017-03-06 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hamidreza Afzali resolved KAFKA-4408.
-
   Resolution: Fixed
Fix Version/s: 0.11.0.0

Issue resolved by pull request 2629
https://github.com/apache/kafka/pull/2629

> KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
> --
>
> Key: KAFKA-4408
> URL: https://issues.apache.org/jira/browse/KAFKA-4408
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Linux
>Reporter: Byron Nikolaidis
>Assignee: Hamidreza Afzali
>  Labels: newbie, unit-test
> Fix For: 0.11.0.0
>
>
> In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
> KTables.  The below test code worked fine under Kafka 0.10.0.1 but now 
> produces this error:
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Could not find partition info for topic: alertInputTopic
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)
> at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174)
> at 
> mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
> {code}
> package mil.navy.icap.kafka.streams.processor.track;
> import java.io.IOException;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serdes.StringSerde;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KTable;
> import org.apache.kafka.test.ProcessorTopologyTestDriver;
> public class ProcessorDriverTest2 {
>  
>  public static void main(String[] args) throws IOException, 
> InterruptedException {
>  System.out.println("ProcessorDriverTest2");
>  
>  Properties props = new Properties();
>  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "ProcessorDriverTest2");
>  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  
>  StreamsConfig streamsConfig = new StreamsConfig(props);
>  
>  // topology
>  KStreamBuilder kstreamBuilder = new KStreamBuilder();
>  StringSerde stringSerde = new StringSerde();
>  KTable table = kstreamBuilder.table(stringSerde,
>  stringSerde, "alertInputTopic");
>  table.to(stringSerde, stringSerde, "alertOutputTopic");
>  
>  // create test driver
>  ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
>  streamsConfig, 
>  kstreamBuilder, 
>  "alertStore");
>  StringSerializer serializer = new StringSerializer();
>  StringDeserializer deserializer = new StringDeserializer();
>  // send data to input topic
>  testDriver.process("alertInputTopic", 
>  "the Key", "the Value", serializer, serializer);
>  
>  // read data from output topic
>  ProducerRecord rec = 
> testDriver.readOutput("alertOutputTopic", 
>  deserializer, deserializer);
>  
>  System.out.println("rec: " + rec);
>  }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898851#comment-15898851
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-4849:
---

@Matthias: I just found DOC inconsistency. My bug report is related to the 
Apache Kafka. I do not follow your comment.
@ASF GitHubBot: Thanks!

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-06 Thread Hai Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898828#comment-15898828
 ] 

Hai Lin commented on KAFKA-4669:


For us, it only happened when the network of Kafka cluster is saturated. And 
the producer just hanging there and can't recover. The behavior is pretty 
random, one of the executor(producer) hangs for 30 minutes and recovery(or we 
replaced the executor manually). This happened only for our spark streaming 
app(0.8 client, and broker is 0.9), other applications recover from the network 
blip much better than the 0.8 client. 

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-06 Thread Hai Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898829#comment-15898829
 ] 

Hai Lin commented on KAFKA-4669:


For us, it only happened when the network of Kafka cluster is saturated. And 
the producer just hanging there and can't recover. The behavior is pretty 
random, one of the executor(producer) hangs for 30 minutes and recovery(or we 
replaced the executor manually). This happened only for our spark streaming 
app(0.8 client, and broker is 0.9), other applications recover from the network 
blip much better than the 0.8 client. 

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-06 Thread James Cheng (JIRA)
James Cheng created KAFKA-4858:
--

 Summary: Long topic names created using old kafka-topics.sh can 
prevent newer brokers from joining any ISRs
 Key: KAFKA-4858
 URL: https://issues.apache.org/jira/browse/KAFKA-4858
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0, 0.10.1.1
Reporter: James Cheng


I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
join any ISRs the cluster.

Prior to 0.10.0.0, the maximum topic length was 255.
With 0.10.0.0 and beyond, the maximum topic length is 249.

The check on topic name length is done by kafka-topics.sh prior to topic 
creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
create a 255 character topic on a 0.10.1.1 broker.

When this happens, you will get the following stack trace (the same one seen in 
KAFKA-3219)
{code}
$ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 127.0.0.1 
--create --topic $TOPIC --partitions 1 --replication-factor 2
Created topic 
"ddd".
{code}

{code}
[2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
{controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
 (kafka.server.KafkaApis)
java.lang.NullPointerException
at 
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegments(Log.scala:155)
at kafka.log.Log.(Log.scala:108)
at kafka.log.LogManager.createLog(LogManager.scala:362)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
at 
kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
at 
kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
at kafka.cluster.Partition.makeLeader(Partition.scala:168)
at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
{code}


The topic does not get created on disk, but the broker thinks the topic is 
ready. The broker seems functional, for other topics. I can produce/consume to 
other topics.
{code}

$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1 --describe
Topic:ddd
   PartitionCount:1ReplicationFactor:2 Configs:
Topic: 

[jira] [Commented] (KAFKA-4844) kafka is holding open file descriptors

2017-03-06 Thread chao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898793#comment-15898793
 ] 

chao commented on KAFKA-4844:
-

In fact , it is vert critical issue ,because kafka hold very big file  ,but we 
can not delete except we stop Kafka process 

This issue was happened on centos 5.8 and centos 6.6.  NFS version looks 3 

 /nas/kafka_logs nfs 
rw,relatime,vers=3,rsize=65536,wsize=65536,namlen=255,hard,proto=tcp,timeo=600,retrans=2,sec=sys,mountaddr=XX.XX.XX.XX,mountvers=3,mountport=635,mountproto=udp,local_lock=none,addr=XX.XX.XX.XX
 0 0

$ cat /etc/redhat-release
CentOS release 6.6 (Final)

> kafka is holding open file descriptors
> --
>
> Key: KAFKA-4844
> URL: https://issues.apache.org/jira/browse/KAFKA-4844
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: chao
>Priority: Critical
>
> We found strange issue on Kafka 0.9.0.1 , kafka is holding opne file 
> descriptors , and not allowing disk space to be reclaimed
> my question:
> 1. what does file (nfsX) mean ??? 
> 2. why kafka is holding file ?? 
> $ sudo lsof /nas/kafka_logs/kafka/Order-6/.nfs04550ffcbd61
> COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
> java 97465 kafka mem REG 0,25 10485760 72683516 
> /nas/kafka_logs/kafka/Order-6/.nfs04550ffcbd61



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4834) Kafka cannot delete topic with ReplicaStateMachine went wrong

2017-03-06 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898790#comment-15898790
 ] 

huxi commented on KAFKA-4834:
-

Did you delete the zookeeper nodes manually before issuing this delete topics?

> Kafka cannot delete topic with ReplicaStateMachine went wrong
> -
>
> Key: KAFKA-4834
> URL: https://issues.apache.org/jira/browse/KAFKA-4834
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dan
>  Labels: reliability
>
> It happened several times that some topics can not be deleted in our 
> production environment. By analyzing the log, we found ReplicaStateMachine 
> went wrong. Here are the error messages:
> In state-change.log:
> ERROR Controller 2 epoch 201 initiated state change of replica 1 for 
> partition [test_create_topic1,1] from OnlineReplica to ReplicaDeletionStarted 
> failed (state.change.logger)
> java.lang.AssertionError: assertion failed: Replica 
> [Topic=test_create_topic1,Partition=1,Replica=1] should be in the 
> OfflineReplica states before moving to ReplicaDeletionStarted state. Instead 
> it is in OnlineReplica state
> at scala.Predef$.assert(Predef.scala:179)
> at 
> kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> In controller.log:
> INFO Leader not yet assigned for partition [test_create_topic1,1]. Skip 
> sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch)
> There may exist two controllers in the cluster because creating a new topic 
> may trigger two machines to change the state of same partition, eg. 
> NonExistentPartition -> NewPartition.
> On the other controller, we found following messages in controller.log of 
> several days earlier:
> [2017-02-25 16:51:22,353] INFO [Topic Deletion Manager 0], Topic deletion 
> callback for 

Re: [VOTE] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-03-06 Thread Gwen Shapira
+1 (binding)

On Mon, Mar 6, 2017 at 7:53 PM, Ewen Cheslack-Postava 
wrote:

> Hi all,
>
> I'd like to kick off voting for KIP-128: Add ByteArrayConverter for Kafka
> Connect:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
>
> There was a small amount of discussion, see the original thread here:
> https://lists.apache.org/thread.html/62fc2245285ac5d15ebb9b2ebed672
> b51e391c8dfe9a51be85f685f3@%3Cdev.kafka.apache.org%3E
>
> The vote will stay open for at least 72 hours.
>
> -Ewen
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: [VOTE] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-03-06 Thread Guozhang Wang
+1 (binding)

On Mon, Mar 6, 2017 at 7:53 PM, Ewen Cheslack-Postava 
wrote:

> Hi all,
>
> I'd like to kick off voting for KIP-128: Add ByteArrayConverter for Kafka
> Connect:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
>
> There was a small amount of discussion, see the original thread here:
> https://lists.apache.org/thread.html/62fc2245285ac5d15ebb9b2ebed672
> b51e391c8dfe9a51be85f685f3@%3Cdev.kafka.apache.org%3E
>
> The vote will stay open for at least 72 hours.
>
> -Ewen
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-03-06 Thread Guozhang Wang
Sounds good. Thanks.

On Wed, Mar 1, 2017 at 8:21 PM, Gwen Shapira  wrote:

> Yeah, you are right, it is best not to make converters actively break the
> data structures :)
>
> On Wed, Mar 1, 2017 at 4:55 PM, Ewen Cheslack-Postava 
> wrote:
>
> > Guozhang,
> >
> > I'm fine w/ adjusting if people want to, but it ends up being more code
> > since we also need to convert SerializationExceptions to DataExceptions
> and
> > the only thing the toConnectData method even does is specific to Connect
> > (adding the SchemaAndValue).
> >
> > Gwen -- isn't that an SMT? ExtractField?
> >
> > -Ewen
> >
> > On Wed, Mar 1, 2017 at 1:58 PM, Gwen Shapira  wrote:
> >
> > > Hi Ewen,
> > >
> > > Thanks for the KIP, I think it will be useful :)
> > >
> > > I'm just wondering if we can add support not just for bytes schema,
> > > but also for a struct that contains bytes? I'm thinking of the
> > > scenario of using a connector to grab BLOBs out of a DB - I think you
> > > end up with this structure if you use a JDBC connector and custom
> > > query...
> > >
> > > Maybe even support Maps with generic objects using Java's default
> > > serialization? I'm not sure if this is useful.
> > >
> > > Gwen
> > >
> > >
> > >
> > > On Sat, Feb 25, 2017 at 2:25 PM, Ewen Cheslack-Postava
> > >  wrote:
> > > > Hi all,
> > > >
> > > > I've added a pretty trivial KIP for adding a pass-through Converter
> for
> > > > Kafka Connect, similar to ByteArraySerializer/Deserializer.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
> > > >
> > > > This wasn't added with the framework originally because the idea was
> to
> > > > deal with structured data for the most part. However, we've seen a
> > couple
> > > > of use cases arise as the framework got more traction and I think it
> > > makes
> > > > sense to provide this out of the box now so people stop reinventing
> the
> > > > wheel (and using a different fully-qualified class name) for each
> > > connector
> > > > that needs this functionality.
> > > >
> > > > I imagine this will be a rather uncontroversial addition, so if I
> don't
> > > see
> > > > any comments in the next day or two I'll just start the vote thread.
> > > >
> > > > -Ewen
> > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >
>
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>



-- 
-- Guozhang


Becoming a contributor

2017-03-06 Thread Tyler Hale
Kafka devs,

I'm looking to start contributing to kafka and have been reading over the 
getting started and contributing guides.  How do I get added to the 
contribution list so that I can assign myself JIRAs?

Thanks,
Tyler 

Sent from my iPhone

[jira] [Commented] (KAFKA-4566) Can't Symlink to Kafka bins

2017-03-06 Thread Marc Bush (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898711#comment-15898711
 ] 

Marc Bush commented on KAFKA-4566:
--

on REL, at least, I found the issue to be with the calling of a path with 
$base_dir/../etc/etc/etc 
it's the .. that kept on causing the OS to evaluate the path as the "real" path 
instead of the symlink version of the path.

I'm my case I use one source location of the kafka files then tor run a number 
of zookeepers and brokers I have directories that inside symlink back to the 
/bin and /lib directories of the source allowing me to have unique config 
directories in each other instance. 

Until I recoded the /../ parts upon startup everythign would redirect and try 
to use everything from the source binaries and configs, after I modified them I 
was able to properly use the source binaries and my own unique config location. 

I'll pull my changes from my code and post them tomorrow when I have a spare 
moment. 

This "fix" only involved updating 3 files 

> Can't Symlink to Kafka bins
> ---
>
> Key: KAFKA-4566
> URL: https://issues.apache.org/jira/browse/KAFKA-4566
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Stephane Maarek
>Assignee: Akhilesh Naidu
>  Labels: newbie
>
> in the kafka consumer for example, the last line is :
> https://github.com/apache/kafka/blob/trunk/bin/kafka-console-consumer.sh#L21
> {code}
> exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
> {code}
> if I create a symlink using 
> {code}
> ln -s
> {code}
> it doesn't resolve the right directory name because of $(dirname $0) 
> I believe the right way is to do:
> {code}
> "$(dirname "$(readlink -e "$0")")"
> {code}
>  
> Any thoughts on that before I do a PR?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1333

2017-03-06 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Rename RecordBatch to ProducerBatch to free the name for KIP-98

--
[...truncated 321.09 KB...]

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED


[jira] [Commented] (KAFKA-4566) Can't Symlink to Kafka bins

2017-03-06 Thread Tyler Hale (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898696#comment-15898696
 ] 

Tyler Hale commented on KAFKA-4566:
---

I know this is a bit more than intended than the original JIRA, but have you 
thought about wrapping these functions into a single CLI, where these kinds of 
reusable methods/tasks would all be in one place?

> Can't Symlink to Kafka bins
> ---
>
> Key: KAFKA-4566
> URL: https://issues.apache.org/jira/browse/KAFKA-4566
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Stephane Maarek
>Assignee: Akhilesh Naidu
>  Labels: newbie
>
> in the kafka consumer for example, the last line is :
> https://github.com/apache/kafka/blob/trunk/bin/kafka-console-consumer.sh#L21
> {code}
> exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
> {code}
> if I create a symlink using 
> {code}
> ln -s
> {code}
> it doesn't resolve the right directory name because of $(dirname $0) 
> I believe the right way is to do:
> {code}
> "$(dirname "$(readlink -e "$0")")"
> {code}
>  
> Any thoughts on that before I do a PR?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[VOTE] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-03-06 Thread Ewen Cheslack-Postava
Hi all,

I'd like to kick off voting for KIP-128: Add ByteArrayConverter for Kafka
Connect:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-128%3A+Add+ByteArrayConverter+for+Kafka+Connect

There was a small amount of discussion, see the original thread here:
https://lists.apache.org/thread.html/62fc2245285ac5d15ebb9b2ebed672b51e391c8dfe9a51be85f685f3@%3Cdev.kafka.apache.org%3E

The vote will stay open for at least 72 hours.

-Ewen


[jira] [Updated] (KAFKA-2273) KIP-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-03-06 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-2273:
---
Fix Version/s: 0.11.0.0

> KIP-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2273) KIP-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-03-06 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898678#comment-15898678
 ] 

Jeff Widman commented on KAFKA-2273:


This KIP was accepted, so I added it to the 0.11 milestone

> KIP-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-06 Thread Matthias J. Sax
Hi,

I want to give a first respond:



1. Producer per task:

First, we did some performance tests, indicating that the performance
penalty is small. Please have a look here:
https://docs.google.com/spreadsheets/d/18aGOB13-ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing

For the test, we ran with a trunk version and a modified version that
uses a producer per task (of course, no transactions, but at-least-once
semantics). The scaling factor indicates the number of brokers and
(single threaded) Streams instances. We used SimpleBenchmark that is
part of AK code base.


Second, as the design is "producer per task" (and not "producer per
partition") it is possible to specify a custom PartitionGrouper that
assigns multiple partitions to a single task. Thus, it allows to reduce
the number of tasks for scenarios with many partitions. Right now, this
interface must be implemented solely by the user, but we could also add
a new config parameter that specifies the max.number.of.tasks or
partitions.per.task so that the user can configure this instead of
implementing the interface.

Third, there is the idea of a "Producer Pool" that would allow to share
resources (network connections, memory, etc) over multiple producers.
This would allow to separate multiple transaction on the producer level,
while resources are shared. There is no detailed design document yet and
there would be a KIP for this feature.

Thus, if there should be any performance problems for high scale
scenarios, there are multiple ways to tackle them while keeping the
"producer per task" design.

Additionally, a "producer per thread" design would be way more complex
and I summarized the issues in a separate document. I will share a link
to the document soon.



2. StateStore recovery:

Streams EoS will in the first design not allow to exploit the
improvements that are added for 0.11 at the moment. However, as 0.10.2
faces the same issues of potentially long recovery, there is no
regression with this regard. Thus, I see those improvements as
orthogonal or add-ons. Nevertheless, we should try to explore those
options and if possible get them into 0.11 such that Streams with EoS
gets the same improvements as at-least-once scenario.



3. Caching:

We might need to do some experiments to quantify the impact on caching.
If it's severe, the suggested default commit interval of 100ms could
also be increased. Also, EoS will not enforce any commit interval, but
only change the default value. Thus, a user can freely trade-off latency
vs. caching-effect.

Last but not least, there is the idea to allow "read_uncommitted" for
intermediate topic. This would be an advance design for Streams EoS that
allows downstream sub-topologies to read uncommitted data
optimistically. In case of failure, a cascading abort of transactions
would be required. This change will need another KIP.



4. Idempotent Producer:

The transactional part automatically leverages the idempotent properties
of the producer. Idempotency is a requirement:

> Note that enable.idempotence must be enabled if a TransactionalId is 
> configured.

See
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.g2xsf9n49puh

All idempotent retries, are handled by the producer internally (with or
without transaction) if enable.idempotence is set to true.



-Matthias



On 3/3/17 3:34 AM, Eno Thereska wrote:
> Another question: 
> 
> The KIP doesn’t exactly spell out how it uses the idempotence guarantee from 
> KIP-98. It seems that only the transactional part is needed. Or is the 
> idempotence guarantee working behind the scenes and helping for some 
> scenarios for which it is not worthwhile aborting a transaction (e.g., 
> retransmitting a record after a temporary network glitch)?
> 
> Thanks
> Eno
> 
>> On Mar 2, 2017, at 4:56 PM, Jay Kreps  wrote:
>>
>> I second the concern on with the one producer per task approach. At a
>> high-level it seems to make sense but I think Damian is exactly right that
>> that cuts against the general design of the producer. Many people have high
>> input partition counts and will have high task counts as a result. I think
>> processing 1000 partitions should not be an unreasonable thing to want to
>> do.
>>
>> The tricky bits will be:
>>
>>   - Reduced effectiveness of batching (or more latency and memory to get
>>   equivalent batching). This doesn't show up in simple benchmarks because
>>   much of the penalty is I/O and CPU on the broker and the additional threads
>>   from all the producers can make a single-threaded benchmark seem faster.
>>   - TCP connection explosion. We maintain one connection per broker. This
>>   is already high since each app instance does this. This design though will
>>   add an additional multiplicative factor based on the partition count of the
>>   input.
>>   - Connection and metadata request storms. When an instance with 1000
>>   tasks starts up it is going to try to 

[jira] [Commented] (KAFKA-4739) KafkaConsumer poll going into an infinite loop

2017-03-06 Thread Jin Tianfan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898623#comment-15898623
 ] 

Jin Tianfan commented on KAFKA-4739:


is this problem sloved?

> KafkaConsumer poll going into an infinite loop
> --
>
> Key: KAFKA-4739
> URL: https://issues.apache.org/jira/browse/KAFKA-4739
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>
> We are seeing an issue with our kafka consumer where it seems to go into an 
> infinite loop while polling, trying to fetch data from kafka. We are seeing 
> the heartbeat requests on the broker from the consumer, but nothing else from 
> the kafka consumer.
> We enabled debug level logging on the consumer, and see these logs: 
> https://gist.github.com/neoeahit/757bff7acdea62656f065f4dcb8974b4
> And this just goes on. The way we have been able to replicate this issue, is 
> by restarting the process in multiple successions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2646: MINOR: Rename RecordBatch to ProducerBatch to allo...

2017-03-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2646


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work started] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4849 started by Matthias J. Sax.
--
> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4849:
---
Status: Patch Available  (was: In Progress)

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898528#comment-15898528
 ] 

ASF GitHub Bot commented on KAFKA-4849:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2648

KAFKA-4849: Bug in KafkaStreams documentation

 - removed ZK config from Streams cods
 - removed deprecated ZK config parameter from configs table

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka kafka-4849-docs-bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2648






> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2648: KAFKA-4849: Bug in KafkaStreams documentation

2017-03-06 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2648

KAFKA-4849: Bug in KafkaStreams documentation

 - removed ZK config from Streams cods
 - removed deprecated ZK config parameter from configs table

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka kafka-4849-docs-bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2648.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2648






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898381#comment-15898381
 ] 

Matthias J. Sax edited comment on KAFKA-4849 at 3/7/17 1:10 AM:


With regard to http://docs.confluent.io/3.2.0/streams/developer-guide.html -> 
even if this is Confluent's documentation and thus not related to a Kafka JIRA. 
Why do you think, both are not marked as "high importance" -- Confluent's docs 
don't say anything about it (if I did not miss it).


was (Author: mjsax):
With regard to http://docs.confluent.io/3.2.0/streams/developer-guide.html -> 
even if this is Confluent's documentation and thus not related to a Kafka JIRA. 
Why do you thing, both are not marked as "high importance" -- Confluent's docs 
don't say anything about it (if I did not miss it).

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-06 Thread Dong Lin
Hey Becket,

I am wondering if we should first vote for the KIP before reviewing the
patch. I have two comments below:

- Should we specify the new sensors as part of interface change in the KIP?
- The KIP proposes to increase estimated compression ratio by 0.05 for each
underestimation and decrement the estimation by 0.005 for each
overestimation. Why are these two values chosen? I think there is some
tradeoff in selecting the value. Can the KIP be more explicit about the
tradeoff and explain how these two values would impact producer's
performance?

Thanks,
Dong


On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin  wrote:

> I have updated the KIP based on the latest discussion. Please check and let
> me know if there is any further concern.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin  wrote:
>
> > Actually second thought on this, rate might be better for two reasons:
> > 1. Most of the metrics in the producer we already have are using rate
> > instead of count.
> > 2. If a service is bounced, the count will be reset to 0, but it does not
> > affect rate.
> >
> > I'll make the change.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin 
> wrote:
> >
> >> Hi Dong,
> >>
> >> Yes, there is a sensor in the patch about the split occurrence.
> >>
> >> Currently it is a count instead of rate. In practice, it seems count is
> >> easier to use in this case. But I am open to change.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin  wrote:
> >>
> >>> Hey Becket,
> >>>
> >>> I haven't looked at the patch yet. But since we are going to try the
> >>> split-on-oversize solution, should the KIP also add a sensor that shows
> >>> the
> >>> rate of split per second and the probability of split?
> >>>
> >>> Thanks,
> >>> Dong
> >>>
> >>>
> >>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin 
> wrote:
> >>>
> >>> > Just to clarify, the implementation is basically what I mentioned
> above
> >>> > (split/resend + adjusted estimation evolving algorithm) and changing
> >>> the
> >>> > compression ratio estimation to be per topic.
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> > On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin 
> >>> wrote:
> >>> >
> >>> > > I went ahead and have a patch submitted here:
> >>> > > https://github.com/apache/kafka/pull/2638
> >>> > >
> >>> > > Per Joel's suggestion, I changed the compression ratio to be per
> >>> topic as
> >>> > > well. It seems working well. Since there is an important behavior
> >>> change
> >>> > > and a new sensor is added, I'll keep the KIP and update it
> according.
> >>> > >
> >>> > > Thanks,
> >>> > >
> >>> > > Jiangjie (Becket) Qin
> >>> > >
> >>> > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy 
> >>> wrote:
> >>> > >
> >>> > >> >
> >>> > >> > Lets say we sent the batch over the wire and received a
> >>> > >> > RecordTooLargeException, how do we split it as once we add the
> >>> message
> >>> > >> to
> >>> > >> > the batch we loose the message level granularity. We will have
> to
> >>> > >> > decompress, do deep iteration and split and again compress.
> right?
> >>> > This
> >>> > >> > looks like a performance bottle neck in case of multi topic
> >>> producers
> >>> > >> like
> >>> > >> > mirror maker.
> >>> > >> >
> >>> > >>
> >>> > >> Yes, but these should be outliers if we do estimation on a
> per-topic
> >>> > basis
> >>> > >> and if we target a conservative-enough compression ratio. The
> >>> producer
> >>> > >> should also avoid sending over the wire if it can be made aware of
> >>> the
> >>> > >> max-message size limit on the broker, and split if it determines
> >>> that a
> >>> > >> record exceeds the broker's config. Ideally this should be part of
> >>> topic
> >>> > >> metadata but is not - so it could be off a periodic
> describe-configs
> >>> > >>  >>> > >> Command+line+and+centralized+administrative+operations#KIP-
> >>> > >> 4-Commandlineandcentralizedadministrativeoperations-Describe
> >>> > >> ConfigsRequest>
> >>> > >> (which isn't available yet). This doesn't remove the need to split
> >>> and
> >>> > >> recompress though.
> >>> > >>
> >>> > >>
> >>> > >> > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <
> >>> becket@gmail.com>
> >>> > >> wrote:
> >>> > >> >
> >>> > >> > > Hey Mayuresh,
> >>> > >> > >
> >>> > >> > > 1) The batch would be split when an RecordTooLargeException is
> >>> > >> received.
> >>> > >> > > 2) Not lower the actual compression ratio, but lower the
> >>> estimated
> >>> > >> > > compression ratio "according to" the Actual Compression
> >>> Ratio(ACR).
> >>> > >> > >
> >>> > >> > > An example, let's start with Estimated Compression Ratio
> (ECR) =
> >>> > 1.0.
> >>> > >> 

Build failed in Jenkins: kafka_0.9.0_jdk7 #139

2017-03-06 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-4251: fix test driver not launching in Vagrant 1.8.6

--
[...truncated 31.60 KB...]
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:12: error: error writing class 
ConsoleInterface: /tmp/sbt_617bab91/xsbt/ConsoleInterface.class: 
/tmp/sbt_617bab91 is not a directory
class ConsoleInterface {
  ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:25: error: error writing <$anon: 
scala.tools.nsc.InterpreterLoop>: 
/tmp/sbt_617bab91/xsbt/ConsoleInterface$$anon$2.class: /tmp/sbt_617bab91 is not 
a directory
val loop = new InterpreterLoop {
   ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:31: error: error writing <$anon: 
scala.tools.nsc.Interpreter>: 
/tmp/sbt_617bab91/xsbt/ConsoleInterface$$anon$2$$anon$1.class: 
/tmp/sbt_617bab91 is not a directory
  interpreter = new Interpreter(settings) {
^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:47: error: error writing <$anon: 
Function1>: 
/tmp/sbt_617bab91/xsbt/ConsoleInterface$$anon$2$$anonfun$bind$1$1.class: 
/tmp/sbt_617bab91 is not a directory
  for ((id, value) <- values)
  ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:47: error: error writing <$anon: 
Function1>: 
/tmp/sbt_617bab91/xsbt/ConsoleInterface$$anon$2$$anonfun$bind$1$2.class: 
/tmp/sbt_617bab91 is not a directory
  for ((id, value) <- values)
   ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:48: error: error writing <$anon: 
Function0>: 
/tmp/sbt_617bab91/xsbt/ConsoleInterface$$anon$2$$anonfun$bind$1$2$$anonfun$apply$1.class:
 /tmp/sbt_617bab91 is not a directory
interpreter.beQuietDuring(interpreter.bindValue(id, value))
   ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:41: error: error writing <$anon: 
Object>: /tmp/sbt_617bab91/xsbt/ConsoleInterface$$anon$2$Compat$2.class: 
/tmp/sbt_617bab91 is not a directory
  final class Compat {
  ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:23: error: error writing <$anon: 
Function0>: /tmp/sbt_617bab91/xsbt/ConsoleInterface$$anonfun$run$1.class: 
/tmp/sbt_617bab91 is not a directory
log.info(Message("Starting scala interpreter..."))
 ^
/tmp/sbt_361d0c98/xsbt/ConsoleInterface.scala:24: error: error writing <$anon: 
Function0>: /tmp/sbt_617bab91/xsbt/ConsoleInterface$$anonfun$run$2.class: 
/tmp/sbt_617bab91 is not a directory
log.info(Message(""))
 ^
/tmp/sbt_361d0c98/xsbt/DelegatingReporter.scala:9: error: error writing object 
DelegatingReporter: /tmp/sbt_617bab91/xsbt/DelegatingReporter$.class: 
/tmp/sbt_617bab91 is not a directory
private object DelegatingReporter {
   ^
/tmp/sbt_361d0c98/xsbt/DelegatingReporter.scala:17: error: error writing class 
DelegatingReporter: /tmp/sbt_617bab91/xsbt/DelegatingReporter.class: 
/tmp/sbt_617bab91 is not a directory
private final class DelegatingReporter(warnFatal: Boolean, noWarn: Boolean, 
private[this] var delegate: xsbti.Reporter) extends 
scala.tools.nsc.reporters.Reporter {
^
/tmp/sbt_361d0c98/xsbt/DelegatingReporter.scala:76: error: error writing 
<$anon: xsbti.Position>: 
/tmp/sbt_617bab91/xsbt/DelegatingReporter$$anon$1.class: /tmp/sbt_617bab91 is 
not a directory
new xsbti.Position {
^
/tmp/sbt_361d0c98/xsbt/DelegatingReporter.scala:65: error: error writing 
<$anon: Function1>: /tmp/sbt_617bab91/xsbt/DelegatingReporter$$anonfun$1.class: 
/tmp/sbt_617bab91 is not a directory
  val pointerSpace = ((lineContent: Seq[Char]).take(pointer).map { case 
'\t' => '\t'; case x => ' ' }).mkString
 ^
/tmp/sbt_361d0c98/xsbt/DelegatingReporter.scala:72: error: error writing class 
DelegatingReporter$WithPoint$1: 
/tmp/sbt_617bab91/xsbt/DelegatingReporter$WithPoint$1.class: /tmp/sbt_617bab91 
is not a directory
  final class WithPoint(val p: Position) { def point = p.offset.get }
  ^
/tmp/sbt_361d0c98/xsbt/Dependency.scala:30: error: error writing class 
Dependency: /tmp/sbt_617bab91/xsbt/Dependency.class: /tmp/sbt_617bab91 is not a 
directory
final class Dependency(val global: CallbackGlobal) extends LocateClassFile {
^
/tmp/sbt_361d0c98/xsbt/Dependency.scala:12: error: error writing object 
Dependency: /tmp/sbt_617bab91/xsbt/Dependency$.class: /tmp/sbt_617bab91 is not 
a directory
object Dependency {
   ^
/tmp/sbt_361d0c98/xsbt/Dependency.scala:187: error: error writing <$anon: 
Function1>: 
/tmp/sbt_617bab91/xsbt/Dependency$$anonfun$xsbt$Dependency$$extractDependenciesByInheritance$1.class:
 /tmp/sbt_617bab91 is not a directory
dependencies.map(enclosingTopLevelClass)
 ^
/tmp/sbt_361d0c98/xsbt/Dependency.scala:161: error: 

Jenkins build is back to normal : kafka-trunk-jdk8 #1332

2017-03-06 Thread Apache Jenkins Server
See 




[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-03-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898415#comment-15898415
 ] 

Jun Rao commented on KAFKA-2729:


[~mjuarez], did you see ZK session expiration in the server.log in the 
controller around that time? The log will look like the following.

INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient)

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4857) Improve Client handling

2017-03-06 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-4857:
--

 Summary: Improve Client handling
 Key: KAFKA-4857
 URL: https://issues.apache.org/jira/browse/KAFKA-4857
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Streams uses {{KafkaClientSupplier}} to get consumer/restore-consumer/producer 
clients. Streams also uses one more client for admin purpose namely 
{{StreamsKafkaClient}} that is instantiated "manually".

With the newly upcoming {{AdminClient}} from KIP-117, we can simplify (or even 
replace {{StreamsKafkaClient}} with the new {{AdminClient}}. We furthermore 
want to unify how the client in generated and extend {{KafkaClientSupplier}} 
with method that return this client.

As this is a public API change, a KIP is required.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4849:
--

Assignee: Matthias J. Sax

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898381#comment-15898381
 ] 

Matthias J. Sax commented on KAFKA-4849:


With regard to http://docs.confluent.io/3.2.0/streams/developer-guide.html -> 
even if this is Confluent's documentation and thus not related to a Kafka JIRA. 
Why do you thing, both are not marked as "high importance" -- Confluent's docs 
don't say anything about it (if I did not miss it).

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-03-06 Thread Jeyhun Karimov
Sorry I was late. I will update javadocs in related methods to emphasize
that TimestampExtractor is stateless.

Cheers,
Jeyhun

On Mon, Mar 6, 2017 at 8:17 PM Guozhang Wang  wrote:

> 1) Sounds good.
>
> 2) Yeah what I meant is to emphasize that TimestampExtractor to be
> stateless in the docs somewhere.
>
>
> Guozhang
>
>
> On Sun, Mar 5, 2017 at 4:27 PM, Matthias J. Sax 
> wrote:
>
> > Guozhang,
> >
> > about renaming the config parameters. I like this idea, but want to
> > point out, that this change should be done in a backward compatible way.
> > Thus, we need to keep (and only deprecate) the current parameter names.
> >
> > I am not sure about (2)? What do you worry about? Using a "stateful
> > extractor"? -- this would be an antipattern IMHO. We can clarify that a
> > TimestampExtrator should be stateless though (even if this should be
> > clear).
> >
> >
> > -Matthias
> >
> >
> > On 3/4/17 6:36 PM, Guozhang Wang wrote:
> > > Jeyhun,
> > >
> > > Thanks for proposing this KIP! And sorry for getting late in the
> > discussion.
> > >
> > > I have a general suggestion not directly related to this KIP and a
> couple
> > > of comments for this KIP here:
> > >
> > > I agree with Mathieu's observation, partly because we are now having
> lots
> > > of overloaded functions both in the DSL and in PAPI, and it would be
> > quite
> > > confusing to users. As Matthias mentioned we do have some plans to
> > refactor
> > > this API, but just wanted to point it out that this KIP may likely urge
> > us
> > > to do the API refactoring sooner than planned. My personal preference
> > would
> > > be doing that the next release (i.e. 0.11.0.0 in June).
> > >
> > >
> > > Now some detailed comments:
> > >
> > > 1. I'd suggest change TIMESTAMP_EXTRACTOR_CLASS_CONFIG to
> > > "default.timestamp.extractor" or "global.timestamp.extractor" (also the
> > > Java variable name can be changed accordingly) along with this change.
> In
> > > addition, maybe we can piggy-backing this to also rename
> > > KEY_SERDE_CLASS_CONFIG/VALUE_SERDE_CLASS_CONFIG to "default.key.." etc
> > in
> > > this KIP.
> > >
> > > 2. Another thing we should consider, is that since now we could
> > potentially
> > > use multiple timestamp extractor instances than a single one, this may
> be
> > > breaking if user's customization did some global bookkeeping based on
> the
> > > previous assumption (maybe a wild thought but e.g. keeping track some
> > > global counts in the extractor as a local variable). We need to clarify
> > > this change in the javadoc and also potentially in the upgrade web doc
> > > sections.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Mar 1, 2017 at 6:09 AM, Michael Noll 
> > wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> Thanks for the KIP!
> > >>
> > >> On Wed, Mar 1, 2017 at 1:49 PM, Bill Bejeck 
> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> Thanks
> > >>> Bill
> > >>>
> > >>> On Wed, Mar 1, 2017 at 5:06 AM, Eno Thereska  >
> > >>> wrote:
> > >>>
> >  +1 (non binding).
> > 
> >  Thanks
> >  Eno
> > > On 28 Feb 2017, at 17:22, Matthias J. Sax 
> > >>> wrote:
> > >
> > > +1
> > >
> > > Thanks a lot for the KIP!
> > >
> > > -Matthias
> > >
> > >
> > > On 2/28/17 1:35 AM, Damian Guy wrote:
> > >> Thanks for the KIP Jeyhun!
> > >>
> > >> +1
> > >>
> > >> On Tue, 28 Feb 2017 at 08:59 Jeyhun Karimov  >
> >  wrote:
> > >>
> > >>> Dear community,
> > >>>
> > >>> I'd like to start the vote for KIP-123:
> > >>> https://cwiki.apache.org/confluence/pages/viewpage.
> >  action?pageId=68714788
> > >>>
> > >>>
> > >>> Cheers,
> > >>> Jeyhun
> > >>> --
> > >>> -Cheers
> > >>>
> > >>> Jeyhun
> > >>>
> > >>
> > >
> > 
> > 
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>
>
> --
> -- Guozhang
>
-- 
-Cheers

Jeyhun


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-03-06 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898347#comment-15898347
 ] 

Jason Gustafson commented on KAFKA-4794:


[~fhussonnois] Thanks for creating the ticket and writing the patch! Since this 
changes a public interface, I think we need a short KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. 

cc [~ewencp]

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-03-06 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4794:
---
Labels: needs-kip  (was: )

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4825) Likely Data Loss in ReassignPartitionsTest System Test

2017-03-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898312#comment-15898312
 ] 

Jun Rao edited comment on KAFKA-4825 at 3/6/17 10:53 PM:
-

[~benstopford], thanks for  reporting this. Took a look at the log and it does 
appear that this is a result of KIP-101.

I first grepped all log truncations on partition topic-11. As you can see, a 
few broker truncated the log to exactly offset 372 twice in a very short 
window. This typically means that the leader was changed very quickly in a 
short window, which can expose the issue in KIP-101.

cat worker2/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,853] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,661] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:04,172] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)

cat worker5/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,765] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,600] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,133] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

cat worker6/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:54,875] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:32:55,665] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:00,676] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,145] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

The worker to broker mapping is the following.
worker2 (broker 3), worker5 (broker 1), worker6 (broker 2), worker8 (broker 4)

Looking at the controller log in broker 4, it first took over as the controller 
at 06:32:58.

[2017-03-01 06:32:58,351] INFO [Controller 4]: Broker 4 starting become 
controller state transition (kafka.controller.KafkaController)

As part of the controller initialization logic, it sends the first 
LeaderAndIsrRequest for  topic-11 with leader=3 (current leader), which 
triggers the first log truncation.

{code}
[2017-03-01 06:33:00,641] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,3,LeaderEpoch:4,ControllerEpoch:1), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), [test_topic,2] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,3] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:3,ControllerEpoch:1), [test_topic,18] -> 
(Leader:3,ISR:3,2,LeaderEpoch:1,ControllerEpoch:1), [test_topic,9] -> 
(Leader:3,ISR:3,4,LeaderEpoch:4,ControllerEpoch:1), [test_topic,6] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,13] -> 
(Leader:2,ISR:2,4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:4,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,14] -> 
(Leader:3,ISR:3,4,2,LeaderEpoch:2,ControllerEpoch:1), [test_topic,8] -> 
(Leader:2,ISR:4,2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,3,4,LeaderEpoch:2,ControllerEpoch:1), [test_topic,11] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,5] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,17] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1)) 
(kafka.controller.PartitionStateMachine)
{code}

Almost immediately after that, broker 3 undergoes controlled shutdown. So, the 
controller moves the leader to broker 1. When broker 1 becomes the new leader, 
it may not have refetched the message at offset 372, causing the data loss.

{code}
[2017-03-01 06:33:00,668] DEBUG [ControlledShutdownLeaderSelector]: Partition 
[test_topic,11] : current leader = 3, new leader = 1 
(kafka.controller.ControlledShutdownLeaderSelector)
[2017-03-01 06:33:00,669] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), 

[jira] [Commented] (KAFKA-4825) Likely Data Loss in ReassignPartitionsTest System Test

2017-03-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898312#comment-15898312
 ] 

Jun Rao commented on KAFKA-4825:


[~benstopford], thanks for  reporting this. Took a look at the log and it does 
appear that this is a result of KIP-101.

I first grepped all log truncations on partition topic-11. As you can see, a 
few broker truncated the log to exactly offset 372 twice in a very short 
window. This typically means that the leader was changed very quickly in a 
short window, which can expose the issue in KIP-101.

cat worker2/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,853] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,661] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:04,172] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)

cat worker5/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,765] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,600] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,133] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

cat worker6/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:54,875] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:32:55,665] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:00,676] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,145] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

The worker to broker mapping is the following.
worker2 (broker 3), worker5 (broker 1), worker6 (broker 2), worker8 (broker 4)

Looking at the controller log in broker 4, it first took over as the controller 
at 06:32:58.

[2017-03-01 06:32:58,351] INFO [Controller 4]: Broker 4 starting become 
controller state transition (kafka.controller.KafkaController)

As part of the controller initialization logic, it sends the first 
LeaderAndIsrRequest for  topic-11 with leader=3 (current leader), which 
triggers the first log truncation.

[2017-03-01 06:33:00,641] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,3,LeaderEpoch:4,ControllerEpoch:1), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), [test_topic,2] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,3] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:3,ControllerEpoch:1), [test_topic,18] -> 
(Leader:3,ISR:3,2,LeaderEpoch:1,ControllerEpoch:1), [test_topic,9] -> 
(Leader:3,ISR:3,4,LeaderEpoch:4,ControllerEpoch:1), [test_topic,6] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,13] -> 
(Leader:2,ISR:2,4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:4,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,14] -> 
(Leader:3,ISR:3,4,2,LeaderEpoch:2,ControllerEpoch:1), [test_topic,8] -> 
(Leader:2,ISR:4,2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,3,4,LeaderEpoch:2,ControllerEpoch:1), [test_topic,11] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,5] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,17] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1)) 
(kafka.controller.PartitionStateMachine)

Almost immediately after that, broker 3 undergoes controlled shutdown. So, the 
controller moves the leader to broker 1. When broker 1 becomes the new leader, 
it may not have refetched the message at offset 372, causing the data loss.

[2017-03-01 06:33:00,668] DEBUG [ControlledShutdownLeaderSelector]: Partition 
[test_topic,11] : current leader = 3, new leader = 1 
(kafka.controller.ControlledShutdownLeaderSelector)
[2017-03-01 06:33:00,669] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), 

[jira] [Updated] (KAFKA-4856) Calling KafkaProducer.close() from multiple threads may cause spurious error

2017-03-06 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-4856:
-
Description: 
Calling {{KafkaProducer.close()}} from multiple threads simultaneously may 
cause the following harmless error message to be logged. There appears to be a 
race-condition in {{AppInfoParser.unregisterAppInfo}} that we don't guard 
against. 

{noformat}
WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)
{noformat}

  was:
Calling KafkaProducer.close() from multiple threads simultaneously may cause 
the following harmless error message to be logged. There appears to be a 
race-condition in AppInfoParser.unregisterAppInfo that we don't guard against. 

{noformat}
WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)
{noformat}


> Calling KafkaProducer.close() from multiple threads may cause spurious error
> 
>
> Key: KAFKA-4856
> URL: https://issues.apache.org/jira/browse/KAFKA-4856
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.2.0
>Reporter: Xavier Léauté
>Priority: Minor
>
> Calling {{KafkaProducer.close()}} from multiple threads simultaneously may 
> cause the following harmless error message to be logged. There appears to be 
> a race-condition in {{AppInfoParser.unregisterAppInfo}} that we don't guard 
> against. 
> {noformat}
> WARN Error unregistering AppInfo mbean 
> (org.apache.kafka.common.utils.AppInfoParser:71)
> javax.management.InstanceNotFoundException: 
> kafka.producer:type=app-info,id=
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
> at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
> at 
> org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4856) Calling KafkaProducer.close() from multiple threads may cause spurious error

2017-03-06 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-4856:
-
Description: 
Calling KafkaProducer.close() from multiple threads simultaneously may cause 
the following harmless error message to be logged. There appears to be a 
race-condition in AppInfoParser.unregisterAppInfo that we don't guard against. 

{noformat}
WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)
{noformat}

  was:
Calling KafkaProducer.close() from multiple threads simultaneously may cause 
the following harmless error message to be logged. There appears to be a 
race-condition in AppInfoParser.unregisterAppInfo that we don't guard against. 

{{ WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665) }}


> Calling KafkaProducer.close() from multiple threads may cause spurious error
> 
>
> Key: KAFKA-4856
> URL: https://issues.apache.org/jira/browse/KAFKA-4856
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.2.0
>Reporter: Xavier Léauté
>Priority: Minor
>
> Calling KafkaProducer.close() from multiple threads simultaneously may cause 
> the following harmless error message to be logged. There appears to be a 
> race-condition in AppInfoParser.unregisterAppInfo that we don't guard 
> against. 
> {noformat}
> WARN Error unregistering AppInfo mbean 
> (org.apache.kafka.common.utils.AppInfoParser:71)
> javax.management.InstanceNotFoundException: 
> kafka.producer:type=app-info,id=
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
> at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
> at 
> org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4856) Calling KafkaProducer.close() from multiple threads may cause spurious error

2017-03-06 Thread JIRA
Xavier Léauté created KAFKA-4856:


 Summary: Calling KafkaProducer.close() from multiple threads may 
cause spurious error
 Key: KAFKA-4856
 URL: https://issues.apache.org/jira/browse/KAFKA-4856
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.0, 0.10.0.0, 0.9.0.0
Reporter: Xavier Léauté
Priority: Minor


Calling KafkaProducer.close() from multiple threads simultaneously may cause 
the following harmless error message to be logged. There appears to be a 
race-condition in AppInfoParser.unregisterAppInfo that we don't guard against. 

{{WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4856) Calling KafkaProducer.close() from multiple threads may cause spurious error

2017-03-06 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-4856:
-
Description: 
Calling KafkaProducer.close() from multiple threads simultaneously may cause 
the following harmless error message to be logged. There appears to be a 
race-condition in AppInfoParser.unregisterAppInfo that we don't guard against. 

{{ WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665) }}

  was:
Calling KafkaProducer.close() from multiple threads simultaneously may cause 
the following harmless error message to be logged. There appears to be a 
race-condition in AppInfoParser.unregisterAppInfo that we don't guard against. 

{{WARN Error unregistering AppInfo mbean 
(org.apache.kafka.common.utils.AppInfoParser:71)
javax.management.InstanceNotFoundException: 
kafka.producer:type=app-info,id=
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665)}}


> Calling KafkaProducer.close() from multiple threads may cause spurious error
> 
>
> Key: KAFKA-4856
> URL: https://issues.apache.org/jira/browse/KAFKA-4856
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.2.0
>Reporter: Xavier Léauté
>Priority: Minor
>
> Calling KafkaProducer.close() from multiple threads simultaneously may cause 
> the following harmless error message to be logged. There appears to be a 
> race-condition in AppInfoParser.unregisterAppInfo that we don't guard 
> against. 
> {{ WARN Error unregistering AppInfo mbean 
> (org.apache.kafka.common.utils.AppInfoParser:71)
> javax.management.InstanceNotFoundException: 
> kafka.producer:type=app-info,id=
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)
> at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)
> at 
> org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:69)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:735)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:686)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:665) 
> }}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2647: MINOR: Add varint serde utilities for new message ...

2017-03-06 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2647

MINOR: Add varint serde utilities for new message format



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka add-varint-serdes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2647


commit 275a5b25ebba269dffd1b9ba5885eb16898f53eb
Author: Jason Gustafson 
Date:   2017-03-06T22:12:16Z

MINOR: Add varint serde utilities for new message format




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898258#comment-15898258
 ] 

Vahid Hashemian edited comment on KAFKA-4845 at 3/6/17 10:20 PM:
-

[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I misunderstood the defect or am missing something. Thanks.


was (Author: vahid):
[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I'm misunderstood the defect or am missing something. Thanks.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898258#comment-15898258
 ] 

Vahid Hashemian commented on KAFKA-4845:


[~DanC], The issue you raised sounds very similar to the one reported in 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed 
in 0.10.2.0.
Also, the seconds code snippet and the comments you added there apply to before 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was 
[fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0].
 The function {{updateFetchPositions(Set partitions)}} 
currently looks like this:
{code}
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions);
}
{code}

So it sounds like you are not running the latest KafkaConsumer code. The issue 
you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of 
[KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 
0.10.2.0.

Please advise if I'm misunderstood the defect or am missing something. Thanks.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4855) Struct SchemaBuilder should not allow duplicate fields.

2017-03-06 Thread Jeremy Custenborder (JIRA)
Jeremy Custenborder created KAFKA-4855:
--

 Summary: Struct SchemaBuilder should not allow duplicate fields.
 Key: KAFKA-4855
 URL: https://issues.apache.org/jira/browse/KAFKA-4855
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Jeremy Custenborder


I would expect this to fail at the build() on schema. It actually makes it all 
the way to Struct.validate() and throws a cryptic error message. .field() 
should throw an exception if a field is already used.

Repro:
{code}
  @Test
  public void duplicateFields() {
final Schema schema = SchemaBuilder.struct()
.name("testing")
.field("id", SchemaBuilder.string().doc("").build())
.field("id", SchemaBuilder.string().doc("").build())
.build();
final Struct struct = new Struct(schema)
.put("id", "testing");
struct.validate();
  }
{code}

{code}
org.apache.kafka.connect.errors.DataException: Invalid value: null used for 
required field at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:212)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:232)
at 
io.confluent.kafka.connect.jms.RecordConverterTest.duplicateFieldRepro(RecordConverterTest.java:289)
{code}





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3155) Transient Failure in kafka.api.PlaintextProducerSendTest.testFlush

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898129#comment-15898129
 ] 

ASF GitHub Bot commented on KAFKA-3155:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2639


> Transient Failure in kafka.api.PlaintextProducerSendTest.testFlush
> --
>
> Key: KAFKA-3155
> URL: https://issues.apache.org/jira/browse/KAFKA-3155
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Armin Braun
>  Labels: transient-unit-test-failure
> Fix For: 0.11.0.0
>
>
> {code}
> Stacktrace
> java.lang.AssertionError: No request is complete.
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.api.BaseProducerSendTest$$anonfun$testFlush$1.apply$mcVI$sp(BaseProducerSendTest.scala:275)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> kafka.api.BaseProducerSendTest.testFlush(BaseProducerSendTest.scala:273)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>   at 
> 

[jira] [Resolved] (KAFKA-3155) Transient Failure in kafka.api.PlaintextProducerSendTest.testFlush

2017-03-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3155.

Resolution: Fixed

Issue resolved by pull request 2639
[https://github.com/apache/kafka/pull/2639]

> Transient Failure in kafka.api.PlaintextProducerSendTest.testFlush
> --
>
> Key: KAFKA-3155
> URL: https://issues.apache.org/jira/browse/KAFKA-3155
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Armin Braun
>  Labels: transient-unit-test-failure
> Fix For: 0.11.0.0
>
>
> {code}
> Stacktrace
> java.lang.AssertionError: No request is complete.
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.api.BaseProducerSendTest$$anonfun$testFlush$1.apply$mcVI$sp(BaseProducerSendTest.scala:275)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> kafka.api.BaseProducerSendTest.testFlush(BaseProducerSendTest.scala:273)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>   at 
> 

[GitHub] kafka pull request #2639: KAFKA-3155: Avoid Long Overflow in org.apache.kafk...

2017-03-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2639


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2646: MINOR: Rename RecordBatch to ProducerBatch to allo...

2017-03-06 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2646

MINOR: Rename RecordBatch to ProducerBatch to allow reuse in KIP-98



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka rename-record-batch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2646






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4854) Producer RecordBatch executes callbacks with `null` provided for metadata if an exception is encountered

2017-03-06 Thread Robert Quinlivan (JIRA)
Robert Quinlivan created KAFKA-4854:
---

 Summary: Producer RecordBatch executes callbacks with `null` 
provided for metadata if an exception is encountered
 Key: KAFKA-4854
 URL: https://issues.apache.org/jira/browse/KAFKA-4854
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.10.1.1
Reporter: Robert Quinlivan
Priority: Minor


When using a user-provided callback with the producer, the `RecordBatch` 
executes the callbacks with a null metadata argument if an exception was 
encountered. For monitoring and debugging purposes, I would prefer if the 
metadata were provided, perhaps optionally. For example, it would be useful to 
know the size of the serialized payload and the offset so these values could 
appear in application logs.

To be entirely clear, the piece of code I am considering is in 
`org.apache.kafka.clients.producer.internals.RecordBatch#done`:

```java
// execute callbacks
for (Thunk thunk : thunks) {
try {
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message 
for topic-partition '{}'", topicPartition, e);
}
}
```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-0.10.2-jdk7 #97

2017-03-06 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: fix broken link for wordcount demo example

--
[...truncated 321.65 KB...]

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault STARTED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault PASSED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testAdvertisePortDefault STARTED

kafka.server.KafkaConfigTest > testAdvertisePortDefault PASSED

kafka.server.KafkaConfigTest > testVersionConfiguration STARTED

kafka.server.KafkaConfigTest > testVersionConfiguration PASSED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol PASSED

kafka.server.SslReplicaFetchTest > testReplicaFetcherThread STARTED

kafka.server.SslReplicaFetchTest > testReplicaFetcherThread PASSED

kafka.server.IsrExpirationTest > testIsrExpirationForSlowFollowers STARTED

kafka.server.IsrExpirationTest > testIsrExpirationForSlowFollowers PASSED

kafka.server.IsrExpirationTest > testIsrExpirationForStuckFollowers STARTED

kafka.server.IsrExpirationTest > testIsrExpirationForStuckFollowers PASSED

kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade STARTED

kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade PASSED

kafka.server.SaslPlaintextReplicaFetchTest > testReplicaFetcherThread STARTED

kafka.server.SaslPlaintextReplicaFetchTest > testReplicaFetcherThread PASSED

kafka.server.ReplicationQuotasTest > 
shouldBootstrapTwoBrokersWithLeaderThrottle STARTED

kafka.server.ReplicationQuotasTest > 
shouldBootstrapTwoBrokersWithLeaderThrottle PASSED

kafka.server.ReplicationQuotasTest > shouldThrottleOldSegments STARTED

kafka.server.ReplicationQuotasTest > shouldThrottleOldSegments PASSED

kafka.server.ReplicationQuotasTest > 
shouldBootstrapTwoBrokersWithFollowerThrottle STARTED

kafka.server.ReplicationQuotasTest > 
shouldBootstrapTwoBrokersWithFollowerThrottle PASSED

kafka.server.ServerStartupTest > testBrokerStateRunningAfterZK STARTED

kafka.server.ServerStartupTest > testBrokerStateRunningAfterZK PASSED

kafka.server.ServerStartupTest > testBrokerCreatesZKChroot STARTED

kafka.server.ServerStartupTest > testBrokerCreatesZKChroot PASSED

kafka.server.ServerStartupTest > testConflictBrokerStartupWithSamePort STARTED

kafka.server.ServerStartupTest > testConflictBrokerStartupWithSamePort PASSED

kafka.server.ServerStartupTest > testConflictBrokerRegistration STARTED

kafka.server.ServerStartupTest > testConflictBrokerRegistration PASSED

kafka.server.ServerStartupTest > testBrokerSelfAware STARTED

kafka.server.ServerStartupTest > testBrokerSelfAware PASSED

kafka.server.ProduceRequestTest > testSimpleProduceRequest STARTED

kafka.server.ProduceRequestTest > testSimpleProduceRequest PASSED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest STARTED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest PASSED

kafka.server.ReplicaManagerTest > testHighWaterMarkDirectoryMapping STARTED

kafka.server.ReplicaManagerTest > testHighWaterMarkDirectoryMapping PASSED

kafka.server.ReplicaManagerTest > 
testFetchBeyondHighWatermarkReturnEmptyResponse STARTED

kafka.server.ReplicaManagerTest > 
testFetchBeyondHighWatermarkReturnEmptyResponse PASSED

kafka.server.ReplicaManagerTest > testIllegalRequiredAcks STARTED

kafka.server.ReplicaManagerTest > testIllegalRequiredAcks PASSED

kafka.server.ReplicaManagerTest > testClearPurgatoryOnBecomingFollower STARTED

kafka.server.ReplicaManagerTest > testClearPurgatoryOnBecomingFollower PASSED

kafka.server.ReplicaManagerTest > testHighwaterMarkRelativeDirectoryMapping 
STARTED

kafka.server.ReplicaManagerTest > testHighwaterMarkRelativeDirectoryMapping 
PASSED

kafka.server.KafkaMetricReporterClusterIdTest > testClusterIdPresent STARTED

kafka.server.KafkaMetricReporterClusterIdTest > testClusterIdPresent PASSED

kafka.server.CreateTopicsRequestWithPolicyTest > testValidCreateTopicsRequests 
STARTED

kafka.server.CreateTopicsRequestWithPolicyTest > testValidCreateTopicsRequests 
PASSED

kafka.server.CreateTopicsRequestWithPolicyTest > testErrorCreateTopicsRequests 
STARTED

kafka.server.CreateTopicsRequestWithPolicyTest > testErrorCreateTopicsRequests 
PASSED

kafka.server.OffsetCommitTest > testUpdateOffsets STARTED

kafka.server.OffsetCommitTest > testUpdateOffsets PASSED

kafka.server.OffsetCommitTest > testLargeMetadataPayload STARTED

kafka.server.OffsetCommitTest > testLargeMetadataPayload PASSED

kafka.server.OffsetCommitTest > testOffsetsDeleteAfterTopicDeletion STARTED

kafka.server.OffsetCommitTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #1331

2017-03-06 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] HOTFIX: fix broken link for wordcount demo example

--
[...truncated 919.74 KB...]
org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommit PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitFailure 
STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitFailure 
PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId STARTED


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-06 Thread Ismael Juma
Thanks Colin.

I am familiar with the protocol semantics, but we need to document the API
for users who don't know the protocol. I still think it would be valuable
to have some examples of how the API would be used for common use cases.
For example, say someone creates a topic and then produces to it. What
would be the recommended way to do that?

Ismael

On Mon, Mar 6, 2017 at 7:43 PM, Colin McCabe  wrote:

> On Mon, Mar 6, 2017, at 05:50, Ismael Juma wrote:
> > Thanks Colin. It seems like you replied to me accidentally instead of the
> > list, so leaving your reply below for the benefit of others.
>
> Thanks, Ismael.  I actually realized my mistake right after I sent to
> you, and re-posted it to the mailing list instead of sending directly.
> Sigh...
>
> >
> > Regarding the disadvantage of having to hunt through the request class,
> > don't people have to do that anyway with the Options classes?
>
> A lot of people will simply choose the default options, until they have
> a reason to do otherwise (for example, they want a longer or shorter
> timeout, etc.)
>
> >
> > Aside from that, it would be great if the KIP included more detailed
> > javadoc for each method including information about potential exceptions.
>
> That's a good question.  Because this is an asynchronous API, methods
> never throw exceptions.  Instead, if you call get() / whenComplete() /
> isCompletedExceptionally() / etc. on one of the CompletableFuture
> objects, you will get the exception.  This is to allow Node.js-style
> completion chaining.  I will add this explanation to the KIP.
>
> > I'm particularly interested in what a user can expect if a create topics
> > succeeds versus what they can expect if a timeout exception is thrown. It
> > would be good to consider partial failures as well.
>
> This is spelled out by KIP-4.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
> Command+line+and+centralized+administrative+operations
>
> Specifically,
>
> > If a timeout error occurs [in CreateTopic], the topic could still be
> > created successfully at a later time. Its up to the client to query
> > for the state at that point.
>
> Since we're specifically not changing the server as part of this KIP,
> those semantics will still be in force.  Of course, there are plenty of
> other exceptions that you can get from CreateTopics that are more
> meaningful, such as permission-related or network-related ones.  But if
> you get a timeout, the operation may or may not have succeeded.
>
> Could we fix the timeout problem?  Sort of.  We could implement
> something like a retry cache.  The brokers would have to maintain a
> cache of operations (and their results) which had succeeded or failed.
> Then, if an RPC got interrupted after the server had performed it, but
> before the client had received the response message, the client could
> simply reconnect on another TCP session and ask the broker for the
> result of the previous operation.  The broker could look up the result
> in the cache and re-send it.
>
> This fix works, but it is very complex.  The cache requires space in
> memory (and to do it perfectly, you also want to persist the cache to
> disk in case the broker restarts and the client re-appears).  The fix
> also requires the client to wait for an indefinite amount of time for
> the server to come back.  If the client ever "gives up" and just throws
> a timeout exception, we are back to not knowing what happened on the
> server.
>
> In any case, I think we should discuss RPC change in a separate KIP...
> the scope is already big enough here.  Also, in practice, users have
> workarounds for cases where there are timeouts or failures to
> communicate.
>
> best,
> Colin
>
> >
> > Ismael
> >
> > On Fri, Mar 3, 2017 at 9:37 PM, Colin McCabe  wrote:
> >
> > > On Fri, Mar 3, 2017, at 06:41, Ismael Juma wrote:
> > > > Hi Colin,
> > > >
> > > > I still need to do a detailed review, but I have a couple of
> > > > comments/questions:
> > > >
> > > > 1. I am not sure about having the options/response classes as inner
> > > > classes
> > > > of the interface. It means that file containing the interface will be
> > > > huge
> > > > eventually. And the classes are not necessarily related either. Why
> not
> > > > use
> > > > a separate package for them?
> > >
> > > Yeah, I think it's reasonable to make these top-level classes and put
> > > them in separate files.  We can put them all in
> > > org.apache.kafka.clients.admin.
> > >
> > > >
> > > > 2. Can you elaborate on how one decides one goes in the Options class
> > > > versus the first parameter?
> > >
> > > I guess I think of options as things that you don't have to set.  For
> > > example, when deleting a topic, you must supply the topic name, but
> > > supplying a non-default timeout is optional.
> > >
> > > > I wonder if it would be simpler to just have a
> > > > single parameter. In that case it should probably be called a
> Request as
> 

Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-06 Thread Colin McCabe
On Mon, Mar 6, 2017, at 05:50, Ismael Juma wrote:
> Thanks Colin. It seems like you replied to me accidentally instead of the
> list, so leaving your reply below for the benefit of others.

Thanks, Ismael.  I actually realized my mistake right after I sent to
you, and re-posted it to the mailing list instead of sending directly. 
Sigh...

> 
> Regarding the disadvantage of having to hunt through the request class,
> don't people have to do that anyway with the Options classes?

A lot of people will simply choose the default options, until they have
a reason to do otherwise (for example, they want a longer or shorter
timeout, etc.)

> 
> Aside from that, it would be great if the KIP included more detailed
> javadoc for each method including information about potential exceptions.

That's a good question.  Because this is an asynchronous API, methods
never throw exceptions.  Instead, if you call get() / whenComplete() /
isCompletedExceptionally() / etc. on one of the CompletableFuture
objects, you will get the exception.  This is to allow Node.js-style
completion chaining.  I will add this explanation to the KIP.

> I'm particularly interested in what a user can expect if a create topics
> succeeds versus what they can expect if a timeout exception is thrown. It
> would be good to consider partial failures as well.

This is spelled out by KIP-4.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

Specifically,

> If a timeout error occurs [in CreateTopic], the topic could still be
> created successfully at a later time. Its up to the client to query
> for the state at that point.

Since we're specifically not changing the server as part of this KIP,
those semantics will still be in force.  Of course, there are plenty of
other exceptions that you can get from CreateTopics that are more
meaningful, such as permission-related or network-related ones.  But if
you get a timeout, the operation may or may not have succeeded.

Could we fix the timeout problem?  Sort of.  We could implement
something like a retry cache.  The brokers would have to maintain a
cache of operations (and their results) which had succeeded or failed. 
Then, if an RPC got interrupted after the server had performed it, but
before the client had received the response message, the client could
simply reconnect on another TCP session and ask the broker for the
result of the previous operation.  The broker could look up the result
in the cache and re-send it.

This fix works, but it is very complex.  The cache requires space in
memory (and to do it perfectly, you also want to persist the cache to
disk in case the broker restarts and the client re-appears).  The fix
also requires the client to wait for an indefinite amount of time for
the server to come back.  If the client ever "gives up" and just throws
a timeout exception, we are back to not knowing what happened on the
server.

In any case, I think we should discuss RPC change in a separate KIP...
the scope is already big enough here.  Also, in practice, users have
workarounds for cases where there are timeouts or failures to
communicate.

best,
Colin

> 
> Ismael
> 
> On Fri, Mar 3, 2017 at 9:37 PM, Colin McCabe  wrote:
> 
> > On Fri, Mar 3, 2017, at 06:41, Ismael Juma wrote:
> > > Hi Colin,
> > >
> > > I still need to do a detailed review, but I have a couple of
> > > comments/questions:
> > >
> > > 1. I am not sure about having the options/response classes as inner
> > > classes
> > > of the interface. It means that file containing the interface will be
> > > huge
> > > eventually. And the classes are not necessarily related either. Why not
> > > use
> > > a separate package for them?
> >
> > Yeah, I think it's reasonable to make these top-level classes and put
> > them in separate files.  We can put them all in
> > org.apache.kafka.clients.admin.
> >
> > >
> > > 2. Can you elaborate on how one decides one goes in the Options class
> > > versus the first parameter?
> >
> > I guess I think of options as things that you don't have to set.  For
> > example, when deleting a topic, you must supply the topic name, but
> > supplying a non-default timeout is optional.
> >
> > > I wonder if it would be simpler to just have a
> > > single parameter. In that case it should probably be called a Request as
> > > Radai suggested, but that's a separate point and we can discuss it
> > > separately.
> >
> > Hmm.  I don't think it would be simpler for users.  It would force
> > people who just want to do something simple like delete a topic or get
> > the api version of a single node to go hunting through the request
> > class.
> >
> > best,
> > Colin
> >
> >
> > >
> > > Ismael
> > >
> > > On Thu, Mar 2, 2017 at 1:58 AM, Colin McCabe  wrote:
> > >
> > > > On Wed, Mar 1, 2017, at 15:52, radai wrote:
> > > > > quick comment on the request objects:
> > > > >
> > > > > i see "abstract class NewTopic" and 

[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897891#comment-15897891
 ] 

Guozhang Wang commented on KAFKA-3514:
--

There are some discussions on pros and cons of changing the punctuate 
semantics: 

https://github.com/apache/kafka/pull/1689

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3874) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion

2017-03-06 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897882#comment-15897882
 ] 

Armin Braun commented on KAFKA-3874:


I think this one is still pretty flaky. I see this 
(https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/) kind of failure a 
lot locally and on Jenkins.
Should we reopen this one maybe? Or add a new JIRA since the way it fails 
appears to have changed to:

{code}
Error Message

java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
records from topic output-topic-2 while only received 0: []
Stacktrace

java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
records from topic output-topic-2 while only received 0: []
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
at 
org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
{code}

> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
> ---
>
> Key: KAFKA-3874
> URL: https://issues.apache.org/jira/browse/KAFKA-3874
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Damian Guy
>  Labels: transient-unit-test-failure
> Fix For: 0.10.1.0
>
>
> The failure I am recording happened in Jenkins, but I had a similar failure 
> locally (and two other integration tests failed in that run).
> {code}
> Expected: <[KeyValue(europe, 13), KeyValue(americas, 4), KeyValue(asia, 25), 
> KeyValue(americas, 23), KeyValue(europe, 69), KeyValue(americas, 101), 
> KeyValue(europe, 109), KeyValue(asia, 124)]>
>  but: was <[KeyValue(europe, 122), KeyValue(americas, 105), 
> KeyValue(asia, 149), KeyValue(americas, 124), KeyValue(europe, 178), 
> KeyValue(americas, 202), KeyValue(europe, 218), KeyValue(asia, 248)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.junit.Assert.assertThat(Assert.java:923)
>   at 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion(JoinIntegrationTest.java:258)
> {code}
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4254/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-03-06 Thread Guozhang Wang
1) Sounds good.

2) Yeah what I meant is to emphasize that TimestampExtractor to be
stateless in the docs somewhere.


Guozhang


On Sun, Mar 5, 2017 at 4:27 PM, Matthias J. Sax 
wrote:

> Guozhang,
>
> about renaming the config parameters. I like this idea, but want to
> point out, that this change should be done in a backward compatible way.
> Thus, we need to keep (and only deprecate) the current parameter names.
>
> I am not sure about (2)? What do you worry about? Using a "stateful
> extractor"? -- this would be an antipattern IMHO. We can clarify that a
> TimestampExtrator should be stateless though (even if this should be
> clear).
>
>
> -Matthias
>
>
> On 3/4/17 6:36 PM, Guozhang Wang wrote:
> > Jeyhun,
> >
> > Thanks for proposing this KIP! And sorry for getting late in the
> discussion.
> >
> > I have a general suggestion not directly related to this KIP and a couple
> > of comments for this KIP here:
> >
> > I agree with Mathieu's observation, partly because we are now having lots
> > of overloaded functions both in the DSL and in PAPI, and it would be
> quite
> > confusing to users. As Matthias mentioned we do have some plans to
> refactor
> > this API, but just wanted to point it out that this KIP may likely urge
> us
> > to do the API refactoring sooner than planned. My personal preference
> would
> > be doing that the next release (i.e. 0.11.0.0 in June).
> >
> >
> > Now some detailed comments:
> >
> > 1. I'd suggest change TIMESTAMP_EXTRACTOR_CLASS_CONFIG to
> > "default.timestamp.extractor" or "global.timestamp.extractor" (also the
> > Java variable name can be changed accordingly) along with this change. In
> > addition, maybe we can piggy-backing this to also rename
> > KEY_SERDE_CLASS_CONFIG/VALUE_SERDE_CLASS_CONFIG to "default.key.." etc
> in
> > this KIP.
> >
> > 2. Another thing we should consider, is that since now we could
> potentially
> > use multiple timestamp extractor instances than a single one, this may be
> > breaking if user's customization did some global bookkeeping based on the
> > previous assumption (maybe a wild thought but e.g. keeping track some
> > global counts in the extractor as a local variable). We need to clarify
> > this change in the javadoc and also potentially in the upgrade web doc
> > sections.
> >
> >
> >
> > Guozhang
> >
> >
> > On Wed, Mar 1, 2017 at 6:09 AM, Michael Noll 
> wrote:
> >
> >> +1 (non-binding)
> >>
> >> Thanks for the KIP!
> >>
> >> On Wed, Mar 1, 2017 at 1:49 PM, Bill Bejeck  wrote:
> >>
> >>> +1
> >>>
> >>> Thanks
> >>> Bill
> >>>
> >>> On Wed, Mar 1, 2017 at 5:06 AM, Eno Thereska 
> >>> wrote:
> >>>
>  +1 (non binding).
> 
>  Thanks
>  Eno
> > On 28 Feb 2017, at 17:22, Matthias J. Sax 
> >>> wrote:
> >
> > +1
> >
> > Thanks a lot for the KIP!
> >
> > -Matthias
> >
> >
> > On 2/28/17 1:35 AM, Damian Guy wrote:
> >> Thanks for the KIP Jeyhun!
> >>
> >> +1
> >>
> >> On Tue, 28 Feb 2017 at 08:59 Jeyhun Karimov 
>  wrote:
> >>
> >>> Dear community,
> >>>
> >>> I'd like to start the vote for KIP-123:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>  action?pageId=68714788
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>> --
> >>> -Cheers
> >>>
> >>> Jeyhun
> >>>
> >>
> >
> 
> 
> >>>
> >>
> >
> >
> >
>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-06 Thread Colin McCabe
As others have mentioned, it seems clear that we want to preserve the
ordering of message headers, so that we can implement things like
lineage tracing.  (For example, each stage could add a "lineage:"
header.)  I also think that we want the ability to add and remove
headers as needed.  It would be really unfortunate if the only way to
remove a message header or add a header at a certain position was the
duplicate the whole message and re-create everything.

So how about implementing ListIterator? 
https://docs.oracle.com/javase/7/docs/api/java/util/ListIterator.html 
It supports adding and removing things at arbitrary positions.  For
people who want to use it as a simple Iterator, it is one (and you can
use all the fancy syntax such as Java's foreach with it).  For people
who want to add and remove things at arbitrary locations, they can.  And
it doesn't expose the implementation, so that can be changed later.  We
can materialize things in memory lazily if we want to, and so forth.  I
think using the standard interface is better than rolling our own
nonstandard collection or iterator interfaces.

regards,
Colin


On Wed, Mar 1, 2017, at 12:59, Becket Qin wrote:
> Hi Ismael,
> 
> Thanks for the reply. Please see the comments inline.
> 
> On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma  wrote:
> 
> > Hi Becket,
> >
> > Thanks for sharing your thoughts. More inline.
> >
> > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin  wrote:
> >
> > > As you can imagine if the ProducerRecord has a value as a List and the
> > > Interceptor.onSend() can actually add an element to the List. If the
> > > producer.send() is called on the same ProducerRecord again, the value
> > list
> > > would have one more element than the previously sent ProducerRecord even
> > > though the ProducerRecord itself is not mutable, right? Same thing can
> > > apply to any modifiable class type.
> > >
> >
> > The difference is that the user chooses the value type. They are free to
> > choose a mutable or immutable type. A generic interceptor cannot mutate the
> > value because it doesn't know the type (and it could be immutable). One
> > could write an interceptor that checked the type of the value at runtime
> > and did things based on that. But again, the user who creates the record is
> > in control.
> >
> But there is no generic interceptor, right? The interceptor always takes
> specific K, V type.
> 
> 
> > From this standpoint allowing headers to be mutable doesn't really weaken
> > > the mutability we already have. Admittedly a mutable header is kind of
> > > guiding user towards to change the headers in the existing object instead
> > > of creating a new one.
> >
> >
> > Yes, with headers, we are providing a model for the user (the user doesn't
> > get to choose it like with keys and values) and for the interceptors. So, I
> > think it's not the same.
> 
> 
> >
> > > But I think reusing an object while it is possible
> > > to be modified by user code is a risk that users themselves are willing
> > to
> > > take. And we do not really protect against that.
> >
> >
> > If users want to take that risk, it's fine. But we give them the option to
> > protect themselves. With mutable headers, there is no option.
> 
> If we want to let the users control the mutability, users can always call
> headers.close() before calling producer.send() and that will force the
> interceptor to create new ProducerRecord object.
> 
> Because the headers are mostly useful for interceptors, unless the users
> do
> not want the interceptors to change their records, it seems reasonable to
> say that by default modification of headers are allowed for the
> interceptors.
> 
> >
> >
> > > But there still seems
> > > value to allow the users to not pay the overhead of creating tons of
> > > objects if they do not reuse an object to send it twice, which is
> > probably
> > > a more common case.
> > >
> >
> > I think the assumption that there would be tons of objects created is
> > incorrect (I suggested a solution that would only involve one additional
> > reference in the `Header` instance). The usability of the immutable API
> > seemed to be more of an issue.
> >
> If we do not allow the users to add headers on existing ProducerRecord
> objects, each interceptor who wants to add headers will have to create a
> new ProducerRecord. So we will have to create NUM_INTERCEPTOR times of
> ProducerRecord, if a producer is sending 100K messages per second, it
> would
> be a lot of new objects, right?
> 
> >
> > In any case, if we do add the `close()` method, then we need to add a note
> > to the compatibility section stating that once a producer record is sent,
> > it cannot be sent again as this would cause interceptors that add headers
> > to fail.
> >
> Agreed, clear documentation is important.
> 
> >
> > Ismael
> >


[jira] [Created] (KAFKA-4853) ByteBufferSerializer does a memcopy

2017-03-06 Thread Werner Daehn (JIRA)
Werner Daehn created KAFKA-4853:
---

 Summary: ByteBufferSerializer does a memcopy
 Key: KAFKA-4853
 URL: https://issues.apache.org/jira/browse/KAFKA-4853
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.1
 Environment: all
Reporter: Werner Daehn
Priority: Minor


When using the ByteBufferSerializer and its backing byte[] has either an offset 
or is not of the exact length, then a memcopy takes place.
You do have a byte[] already but wrapped inside a ByteBuffer and yet the entire 
(large) payload is copied to a byte[]? 

The reason why this is done is obvious, the entire serialization framework 
works on the datatype byte[]. And since this datatype does not have an offset 
and a length information, the copy is necessary. 

But actually, I would argue into the reverse direction. The root problem is 
that the serialization is using the byte[]. 

Change the serialize() method to return a ByteBuffer instead. 
The ByteArraySerializer would wrap the byte[] into a ByteBuffer with offset=0 
and length=data.length.
All other serializers the same.
But for those cases where the ByteBufferSerializer is used, you have the extra 
options and methods.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2017-03-06 Thread Werner Daehn (JIRA)
Werner Daehn created KAFKA-4852:
---

 Summary: ByteBufferSerializer not compatible with offsets
 Key: KAFKA-4852
 URL: https://issues.apache.org/jira/browse/KAFKA-4852
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.1
 Environment: all
Reporter: Werner Daehn
Priority: Minor


Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.

Solution: No rewind() but flip() for reading a ByteBuffer. That's what the flip 
is meant for.

Story:

Imagine the incoming data comes from a byte[], e.g. a network stream containing 
topicname, partition, key, value, ... and you want to create a new 
ProducerRecord for that. As the constructor of ProducerRecord requires (topic, 
partition, key, value) you have to copy from above byte[] the key and value. 
That means there is a memcopy taking place. Since the payload can be 
potentially large, that introduces a lot of overhead. Twice the memory.

A nice solution to this problem is to simply wrap the network byte[] into new 
ByteBuffers:
ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
and then use the ByteBufferSerializer instead of the ByteArraySerializer.

But that does not work as the ByteBufferSerializer does a rewind(), hence both, 
key and value, will start at position=0 of the data[].


public class ByteBufferSerializer implements Serializer {
public byte[] serialize(String topic, ByteBuffer data) {
 data.rewind();



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4845:
--

Assignee: Vahid Hashemian

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-06 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897780#comment-15897780
 ] 

Vahid Hashemian commented on KAFKA-4845:


[~ijuma], sure, I'll take a look.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897771#comment-15897771
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Oh, I wouldn't mind that at all. Just thought that you wanted to stick to event 
time semantics for this, but if you're not precious about that then I'm all for 
it :)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-06 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897746#comment-15897746
 ] 

Edoardo Comar commented on KAFKA-4669:
--

We've seen this same exception using 10.0.1 clients against 10.0.1 brokers - 
without doing a {{KafkaProducer.flush()}}

We've seen it happening during 
{{KafkaConsumer.poll()}}
{{KafkaConsumer.commitSync()}}
and
in the network I/O of a  KafkaProducer that is just doing sends.eg.

{code}
java.lang.IllegalStateException: Correlation id for response (4564)
does not match request (4562)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Unknown Source)
{code}

{code}
java.lang.IllegalStateException: Correlation id for response (742) does not 
match request (174)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1027)
{code}



Usually the response's correlation id is off by just 1 or 2 but we've also seen 
it off by a few hundreds.

When this happens, all subsequent responses are also shifted:
{code}
java.lang.IllegalStateException: Correlation id for response (743)
does not match request (742)
java.lang.IllegalStateException: Correlation id for response (744)
does not match request (743)
java.lang.IllegalStateException: Correlation id for response (745)
does not match request (744)
{code}



> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> 

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-06 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897747#comment-15897747
 ] 

Edoardo Comar commented on KAFKA-4669:
--

It's easy to discard and recreate the consumer instance to recover
however we can't do that with the producer as it occurs in the Sender
thread.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: java.lang.IllegalStateException: Correlation id for response () does not match request ()

2017-03-06 Thread Ismael Juma
Hi Mickael,

This looks to be the same as KAFKA-4669. In theory, this should never
happen and it's unclear when/how it can happen. Not sure if someone has
investigated it in more detail.

Ismael

On Mon, Mar 6, 2017 at 5:15 PM, Mickael Maison 
wrote:

> Hi,
>
> In one of our clusters, some of our clients occasionally see this
> exception:
> java.lang.IllegalStateException: Correlation id for response (4564)
> does not match request (4562)
> at org.apache.kafka.clients.NetworkClient.correlate(
> NetworkClient.java:486)
> at org.apache.kafka.clients.NetworkClient.parseResponse(
> NetworkClient.java:381)
> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> NetworkClient.java:449)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
> at java.lang.Thread.run(Unknown Source)
>
> We've also seen it from consumer poll() and commit()
>
> Usually the response's correlation id is off by just 1 or 2 (like
> above) but we've also seen it off by a few hundreds:
> java.lang.IllegalStateException: Correlation id for response (742)
> does not match request (174)
> at org.apache.kafka.clients.NetworkClient.correlate(
> NetworkClient.java:486)
> at org.apache.kafka.clients.NetworkClient.parseResponse(
> NetworkClient.java:381)
> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> NetworkClient.java:449)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> clientPoll(ConsumerNetworkClient.java:360)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:426)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1059)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1027)
>
> When this happens, all subsequent responses are also shifted:
> java.lang.IllegalStateException: Correlation id for response (743)
> does not match request (742)
> java.lang.IllegalStateException: Correlation id for response (744)
> does not match request (743)
> java.lang.IllegalStateException: Correlation id for response (745)
> does not match request (744)
> java.lang.IllegalStateException: Correlation id for response (746)
> does not match request (745)
>  ...
> It's easy to discard and recreate the consumer instance to recover
> however we can't do that with the producer as it occurs in the Sender
> thread.
>
> Our cluster and our clients are running Kafka 0.10.0.1.
> Under which circumstances would such an error happen ?
> Even with logging set to TRACE, we can't spot anything suspicious
> shortly before the issue. Is there any data we should try to capture
> when this happens ?
>
> Thanks!
>


[GitHub] kafka pull request #2644: Hotfix: broken link

2017-03-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2644


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897729#comment-15897729
 ] 

Matthias J. Sax commented on KAFKA-3514:


[~mihbor] I am not sure about this. To me it seems to be a cleaner solution, to 
change punctuate semantics back to system time.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4851) SessionStore.fetch(key) is a performance bottleneck

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897714#comment-15897714
 ] 

ASF GitHub Bot commented on KAFKA-4851:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2645

KAFKA-4851: only search available segments during Segments.segments(from, 
to)

restrict the locating of segments in {{Segments#segments(..)}} to only the 
segments that are currently available, i.e., rather than searching the hashmap 
for many segments that don't exist.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka session-windows-testing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2645.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2645


commit fca1a2277fbb7020c42e4e028e8ec03bed3a8335
Author: Damian Guy 
Date:   2017-03-06T17:03:27Z

only search available segments




> SessionStore.fetch(key) is a performance bottleneck
> ---
>
> Key: KAFKA-4851
> URL: https://issues.apache.org/jira/browse/KAFKA-4851
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.11.0.0
>
>
> When flushing the {{CachingSessionStore}} we need to search for the previous 
> value for a session. This involves searching each open RocksDB segment. The 
> code ends up doing a call  {{Segments.segments(0, Long.MAX_VALUE)}} this 
> results in approximately 3 million gets on a {{ConcurrentHashMap}} of which 
> all but 3 of them will be hits. 
> Change this code to restrict the segmentIds to search just to the available 
> set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2645: KAFKA-4851: only search available segments during ...

2017-03-06 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2645

KAFKA-4851: only search available segments during Segments.segments(from, 
to)

restrict the locating of segments in {{Segments#segments(..)}} to only the 
segments that are currently available, i.e., rather than searching the hashmap 
for many segments that don't exist.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka session-windows-testing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2645.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2645


commit fca1a2277fbb7020c42e4e028e8ec03bed3a8335
Author: Damian Guy 
Date:   2017-03-06T17:03:27Z

only search available segments




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2644: hotfix: broken link

2017-03-06 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2644

hotfix: broken link



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka hotfixBrokerLink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2644.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2644


commit e07d3178e40cef2bac6378ca843d7c945033940a
Author: Matthias J. Sax 
Date:   2017-03-06T17:29:56Z

hotfix: broken link




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


java.lang.IllegalStateException: Correlation id for response () does not match request ()

2017-03-06 Thread Mickael Maison
Hi,

In one of our clusters, some of our clients occasionally see this exception:
java.lang.IllegalStateException: Correlation id for response (4564)
does not match request (4562)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
at java.lang.Thread.run(Unknown Source)

We've also seen it from consumer poll() and commit()

Usually the response's correlation id is off by just 1 or 2 (like
above) but we've also seen it off by a few hundreds:
java.lang.IllegalStateException: Correlation id for response (742)
does not match request (174)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:486)
at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:381)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1027)

When this happens, all subsequent responses are also shifted:
java.lang.IllegalStateException: Correlation id for response (743)
does not match request (742)
java.lang.IllegalStateException: Correlation id for response (744)
does not match request (743)
java.lang.IllegalStateException: Correlation id for response (745)
does not match request (744)
java.lang.IllegalStateException: Correlation id for response (746)
does not match request (745)
 ...
It's easy to discard and recreate the consumer instance to recover
however we can't do that with the producer as it occurs in the Sender
thread.

Our cluster and our clients are running Kafka 0.10.0.1.
Under which circumstances would such an error happen ?
Even with logging set to TRACE, we can't spot anything suspicious
shortly before the issue. Is there any data we should try to capture
when this happens ?

Thanks!


[jira] [Created] (KAFKA-4851) SessionStore.fetch(key) is a performance bottleneck

2017-03-06 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4851:
-

 Summary: SessionStore.fetch(key) is a performance bottleneck
 Key: KAFKA-4851
 URL: https://issues.apache.org/jira/browse/KAFKA-4851
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.11.0.0


When flushing the {{CachingSessionStore}} we need to search for the previous 
value for a session. This involves searching each open RocksDB segment. The 
code ends up doing a call  {{Segments.segments(0, Long.MAX_VALUE)}} this 
results in approximately 3 million gets on a {{ConcurrentHashMap}} of which all 
but 3 of them will be hits. 

Change this code to restrict the segmentIds to search just to the available set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Allow to replace Zookeeper with different Coordination Service (etcd/consul)

2017-03-06 Thread Ofir Manor
I suggest you check KIP-30 and the extensive discussion about it in the
mailing list  from around December 2015 called "[DISCUSS] KIP-30 Allow for
brokers to have plug-able consensus and meta data storage sub systems"
If I remember correctly, it ran into some objections, as the existing
commiters thought at the time that ZK was significantly more reliable, so
it was not worth the effort to add inferior options.
I personally think nowadays, when a lot of other critical cluster infra
relies on these coordination services anyway, this KIP makes a lot of
sense. The current dependency of ZK creates a large, unneeded operational
overhead for those who have already deployed and relies on etcd/consul for
the rest of their stack (including other stateful services).
Just my two cents,

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io

On Mon, Mar 6, 2017 at 12:15 PM, Alexander Binzberger <
alexander.binzber...@wingcon.com> wrote:

> I would also be interested in this. (etcd)
>
>
> Am 02.03.2017 um 12:24 schrieb Molnár Bálint:
>
>> Hi,
>>
>> I was wonderring to refactor Kafka core code to be able to use different
>> Coordination Service than Zookeeper. I know I will need to create a KIP
>> for
>> that.
>> I think the first part of this task to refactor the classes which are
>> using
>> the ZkUtil methods to use a zookeeper independent trait instead.
>> After that I think it will be possible to replace Zookeeper with
>> etcd/Consul or even with a Raft implementation.
>> Even without additional implementation it would help to test the code
>> without starting an embedded zookeeper.
>> I have already started to implement a POC and it seems doable, even if
>> it's
>> not a small patch.
>>
>> Balint
>>
>>
> --
> Alexander Binzberger
> System Designer - WINGcon AG
> Tel. +49 7543 966-119
>
> Sitz der Gesellschaft: Langenargen
> Registergericht: ULM, HRB 734260
> USt-Id.: DE232931635, WEEE-Id.: DE74015979
> Vorstand: thomasThomas Ehrle (Vorsitz), Fritz R. Paul (Stellvertreter),
> Tobias Treß
> Aufsichtsrat: Jürgen Maucher (Vorsitz), Andreas Paul (Stellvertreter),
> Martin Sauter
>
>


[jira] [Commented] (KAFKA-4846) Use KafkaProducer without allocating a new ProducerRecord for each message sent

2017-03-06 Thread Matt Sicker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897564#comment-15897564
 ] 

Matt Sicker commented on KAFKA-4846:


I know I'd love this feature personally for distributed trace logging a la 
Zipkin. I don't wish to contribute overhead to instrumented services, and 
increasing memory pressure is one of those ways to get some pushback from 
certain mechanical sympathy type people.

We spent a large epic in Log4j 2.6 making the main execution paths 
garbage-free, and since then, we've added GC-free support to more and more 
components. Of course, it doesn't make sense to add this sort of feature to 
every library, but seeing as how I find that the KafkaAppender has the best 
potential for networked logging other than the FlumeAppender (I'm not sure how 
performance compares here), it'd be great to support this use case in Log4j as 
such.

I did find an [alternative Kafka 
client|https://github.com/blackberry/Krackle/], but it appears to be limited to 
the 0.8 message format as well as not entirely what we're looking for.

> Use KafkaProducer without allocating a new ProducerRecord for each message 
> sent
> ---
>
> Key: KAFKA-4846
> URL: https://issues.apache.org/jira/browse/KAFKA-4846
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Mikael Ståldal
>
> The KafkaProducer API requires you to allocate a new ProducerRecord for each 
> record sent. This is unfortunate since some application wants to reduce 
> object allocations to minimize GC work. This would be useful for Log4j to  
> allow [garbage free 
> logging|https://logging.apache.org/log4j/2.x/manual/garbagefree.html] in its 
> [Kafka 
> appender|https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender].
> This could be solved by adding a new method with unrolled arguments to 
> KafkaProducer, like this:
> {code}
>  Future send(String topic, Integer partition, Long timestamp, 
> K key, V value);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4850) RocksDb cannot use Bloom Filters

2017-03-06 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-4850:
---

 Summary: RocksDb cannot use Bloom Filters
 Key: KAFKA-4850
 URL: https://issues.apache.org/jira/browse/KAFKA-4850
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Eno Thereska
Priority: Minor
 Fix For: 0.11.0.0


Bloom Filters would speed up RocksDb lookups. However they currently do not 
work in RocksDb 5.0.2. This has been fixed in trunk, but we'll have to wait 
until that is released and tested. 

Then we can add the line in RocksDbStore.java in openDb:
tableConfig.setFilter(new BloomFilter(10));



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-06 Thread Seweryn Habdank-Wojewodzki (JIRA)
Seweryn Habdank-Wojewodzki created KAFKA-4849:
-

 Summary: Bug in KafkaStreams documentation
 Key: KAFKA-4849
 URL: https://issues.apache.org/jira/browse/KAFKA-4849
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Seweryn Habdank-Wojewodzki
Priority: Minor


At the page: https://kafka.apache.org/documentation/streams
 
In the chapter titled Application Configuration and Execution, in the example 
there is a line:
 
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
 
but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
 
Also the table on the page: 
https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit misleading.
1. Again zookeeper.connect is deprecated.
2. The client.id and zookeeper.connect are marked by high importance, 
but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
none of them are important to initialize the stream.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff

2017-03-06 Thread Sachin Mittal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897463#comment-15897463
 ] 

Sachin Mittal commented on KAFKA-4848:
--

Please refer https://github.com/apache/kafka/pull/2642 as potential fix for the 
issue.

> Stream thread getting into deadlock state while trying to get rocksdb lock in 
> retryWithBackoff
> --
>
> Key: KAFKA-4848
> URL: https://issues.apache.org/jira/browse/KAFKA-4848
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Sachin Mittal
> Attachments: thr-1
>
>
> We see a deadlock state when streams thread to process a task takes longer 
> than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions 
> are assigned to some other thread including rocksdb lock. When it tries to 
> process the next task it cannot get rocks db lock and simply keeps waiting 
> for that lock forever.
> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
> If it does not get lock the we simply increase the time by 10x and keep 
> trying inside the while true loop.
> We need to have a upper bound for this backoffTimeM. If the time is greater 
> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this 
> thread's partitions are moved somewhere else and it may not get the lock 
> again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff

2017-03-06 Thread Sachin Mittal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897462#comment-15897462
 ] 

Sachin Mittal commented on KAFKA-4848:
--

The deadlock issue is like this.
If a thread has two partitions and while processing first partition it takes 
more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted from 
the group and both partitions are now migrated to some other thread. Now when 
it tries to process the second partition it tries to get the lock to rocks db. 
It won't get the lock since that partition is now moved to some other thread. 
So it keeps increasing the backoffTimeMs and keeps trying to get the lock 
forever. This reaching a deadlock.
To fix this we need some upper bound of the time limit till it tries to get 
that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG, because 
if by that time it has not got the lock, we can see that this thread was 
evicted from the group and need to rejoin again to get new partitions.

See in attached file:
DEBUG 2017-03-01 18:17:42,465 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[StreamThread-1] creating new task 0_4

DEBUG 2017-03-01 18:24:19,202 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[StreamThread-1] creating new task 0_8

Note from above 2 lines it took more than 5 minutes to process task 0_4. As a 
result partitions moved to a different thread.

Next see following entries for 0_8

WARN 2017-03-01 18:24:20,205 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:21,257 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:22,360 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:23,563 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:24,966 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:26,768 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:29,371 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:34,435 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:41,837 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:24:55,640 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:25:22,242 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:26:14,445 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:27:57,848 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:31:23,689 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:38:14,294 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 18:51:54,497 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 19:19:13,900 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 20:13:53,014 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-01 22:03:07,629 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-02 01:41:35,831 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.
WARN 2017-03-02 08:58:31,234 [StreamThread-1]: 
org.apache.kafka.streams.processor.internals.StreamThread - Could not create 
task 0_8. Will retry.

>From 2017-03-01 18:24:20,205 to 017-03-02 08:58:31,234 it kept trying to get 
>the lock, hence deadlock.


> Stream thread getting into deadlock state while trying to get rocksdb lock in 
> retryWithBackoff
> 

[jira] [Created] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff

2017-03-06 Thread Sachin Mittal (JIRA)
Sachin Mittal created KAFKA-4848:


 Summary: Stream thread getting into deadlock state while trying to 
get rocksdb lock in retryWithBackoff
 Key: KAFKA-4848
 URL: https://issues.apache.org/jira/browse/KAFKA-4848
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Sachin Mittal
 Attachments: thr-1

We see a deadlock state when streams thread to process a task takes longer than 
MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions are 
assigned to some other thread including rocksdb lock. When it tries to process 
the next task it cannot get rocks db lock and simply keeps waiting for that 
lock forever.

in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
If it does not get lock the we simply increase the time by 10x and keep trying 
inside the while true loop.

We need to have a upper bound for this backoffTimeM. If the time is greater 
than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this 
thread's partitions are moved somewhere else and it may not get the lock again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1330

2017-03-06 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-4266; ReassignPartitionsClusterTest: ensure ZK publication is

--
[...truncated 100.80 KB...]

kafka.api.PlaintextConsumerTest > testEarliestOrLatestOffsets STARTED

kafka.api.PlaintextConsumerTest > testEarliestOrLatestOffsets PASSED

kafka.api.PlaintextConsumerTest > testPartitionsForAutoCreate STARTED

kafka.api.PlaintextConsumerTest > testPartitionsForAutoCreate PASSED

kafka.api.PlaintextConsumerTest > testShrinkingTopicSubscriptions STARTED

kafka.api.PlaintextConsumerTest > testShrinkingTopicSubscriptions PASSED

kafka.api.PlaintextConsumerTest > testMaxPollIntervalMs STARTED

kafka.api.PlaintextConsumerTest > testMaxPollIntervalMs PASSED

kafka.api.PlaintextConsumerTest > testOffsetsForTimes STARTED

kafka.api.PlaintextConsumerTest > testOffsetsForTimes PASSED

kafka.api.PlaintextConsumerTest > testSubsequentPatternSubscription STARTED

kafka.api.PlaintextConsumerTest > testSubsequentPatternSubscription PASSED

kafka.api.PlaintextConsumerTest > testAsyncCommit STARTED

kafka.api.PlaintextConsumerTest > testAsyncCommit PASSED

kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition 
STARTED

kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition 
PASSED

kafka.api.PlaintextConsumerTest > testMultiConsumerSessionTimeoutOnStopPolling 
STARTED

kafka.api.PlaintextConsumerTest > testMultiConsumerSessionTimeoutOnStopPolling 
PASSED

kafka.api.PlaintextConsumerTest > testMaxPollIntervalMsDelayInRevocation STARTED

kafka.api.PlaintextConsumerTest > testMaxPollIntervalMsDelayInRevocation PASSED

kafka.api.PlaintextConsumerTest > testPerPartitionLagMetricsCleanUpWithAssign 
STARTED

kafka.api.PlaintextConsumerTest > testPerPartitionLagMetricsCleanUpWithAssign 
PASSED

kafka.api.PlaintextConsumerTest > testPartitionsForInvalidTopic STARTED

kafka.api.PlaintextConsumerTest > testPartitionsForInvalidTopic PASSED

kafka.api.PlaintextConsumerTest > testPauseStateNotPreservedByRebalance STARTED

kafka.api.PlaintextConsumerTest > testPauseStateNotPreservedByRebalance PASSED

kafka.api.PlaintextConsumerTest > 
testFetchHonoursFetchSizeIfLargeRecordNotFirst STARTED

kafka.api.PlaintextConsumerTest > 
testFetchHonoursFetchSizeIfLargeRecordNotFirst PASSED

kafka.api.PlaintextConsumerTest > testSeek STARTED

kafka.api.PlaintextConsumerTest > testSeek PASSED

kafka.api.PlaintextConsumerTest > testPositionAndCommit STARTED

kafka.api.PlaintextConsumerTest > testPositionAndCommit PASSED

kafka.api.PlaintextConsumerTest > 
testFetchRecordLargerThanMaxPartitionFetchBytes STARTED

kafka.api.PlaintextConsumerTest > 
testFetchRecordLargerThanMaxPartitionFetchBytes PASSED

kafka.api.PlaintextConsumerTest > testUnsubscribeTopic STARTED

kafka.api.PlaintextConsumerTest > testUnsubscribeTopic PASSED

kafka.api.PlaintextConsumerTest > testMultiConsumerSessionTimeoutOnClose STARTED

kafka.api.PlaintextConsumerTest > testMultiConsumerSessionTimeoutOnClose PASSED

kafka.api.PlaintextConsumerTest > testFetchRecordLargerThanFetchMaxBytes STARTED

kafka.api.PlaintextConsumerTest > testFetchRecordLargerThanFetchMaxBytes PASSED

kafka.api.PlaintextConsumerTest > testMultiConsumerDefaultAssignment STARTED

kafka.api.PlaintextConsumerTest > testMultiConsumerDefaultAssignment PASSED

kafka.api.PlaintextConsumerTest > testAutoCommitOnClose STARTED

kafka.api.PlaintextConsumerTest > testAutoCommitOnClose PASSED

kafka.api.PlaintextConsumerTest > testListTopics STARTED

kafka.api.PlaintextConsumerTest > testListTopics PASSED

kafka.api.PlaintextConsumerTest > testExpandingTopicSubscriptions STARTED

kafka.api.PlaintextConsumerTest > testExpandingTopicSubscriptions PASSED

kafka.api.PlaintextConsumerTest > testInterceptors STARTED

kafka.api.PlaintextConsumerTest > testInterceptors PASSED

kafka.api.PlaintextConsumerTest > testPatternUnsubscription STARTED

kafka.api.PlaintextConsumerTest > testPatternUnsubscription PASSED

kafka.api.PlaintextConsumerTest > testGroupConsumption STARTED

kafka.api.PlaintextConsumerTest > testGroupConsumption PASSED

kafka.api.PlaintextConsumerTest > testPartitionsFor STARTED

kafka.api.PlaintextConsumerTest > testPartitionsFor PASSED

kafka.api.PlaintextConsumerTest > testAutoCommitOnRebalance STARTED

kafka.api.PlaintextConsumerTest > testAutoCommitOnRebalance PASSED

kafka.api.PlaintextConsumerTest > testInterceptorsWithWrongKeyValue STARTED

kafka.api.PlaintextConsumerTest > testInterceptorsWithWrongKeyValue PASSED

kafka.api.PlaintextConsumerTest > testMaxPollIntervalMsDelayInAssignment STARTED

kafka.api.PlaintextConsumerTest > testMaxPollIntervalMsDelayInAssignment PASSED

kafka.api.PlaintextConsumerTest > testMultiConsumerRoundRobinAssignment STARTED

kafka.api.PlaintextConsumerTest > testMultiConsumerRoundRobinAssignment PASSED

kafka.api.PlaintextConsumerTest > testPartitionPauseAndResume STARTED


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897413#comment-15897413
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Thank you for responding.
Just now I had a thought about the semantics of event time.
It is already possible to provide a TimestampExtractor that determines what the 
event time is, given a message.
It's not far fetched to assume user should also want a way to specify what the 
event time is, given the absence of messages (on one or more input partitions).
Possibly by providing an implementation other than what 
PartitionGroup.timestamp() is doing based on the timestamps of its partitions.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897381#comment-15897381
 ] 

Eno Thereska commented on KAFKA-3514:
-

[~mihbor] I understand now. I had thought the effect was limited, but what you 
say makes sense. Thanks.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-81: Max in-flight fetches

2017-03-06 Thread Mickael Maison
Following the feedback from the Voting thread I've updated the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Instead of trying to identify Coordinator's messages by their size,
now the proposal is to mark the Node/Channel used to communicate with
the Coordinator with a flag. Messages received from channels with the
flag set will be allocated outside of the MemoryPool. That way they
won't risk being delayed and starve the Coordinator.



On Thu, Jan 19, 2017 at 9:16 PM, Jason Gustafson  wrote:
> Thanks for the updates, Mickael. The proposal looks good to me.
>
> -Jason
>
> On Wed, Jan 18, 2017 at 10:19 AM, Mickael Maison 
> wrote:
>
>> I've updated the KIP to mention this.
>> I've currently set the size limit to 1Kb. It's large enough so
>> group/heartbeat messages are smaller than it and also small enough so
>> the consumer memory usage stays under control.
>>
>> If there are no more comments, I'll restart the vote
>>
>> On Wed, Jan 18, 2017 at 2:28 AM, radai  wrote:
>> > i have (hopefully?) addressed Rajini's concern of muting all connections
>> > ahead of time on the KIP-72 PR.
>> > as for avoiding the pool for small allocations i think thats a great
>> idea.
>> > I also think you could implement it as a composite pool :-)
>> > (composite redirects all requests under size X to the NONE pool and
>> above X
>> > to some "real" pool)
>> >
>> > On Wed, Jan 11, 2017 at 8:05 AM, Mickael Maison <
>> mickael.mai...@gmail.com>
>> > wrote:
>> >
>> >> Ok thanks for the clarification.
>> >> I agree too, I don't want a new config parameter. From the numbers we
>> >> gathered (see Edoardo's comment above), it shouldn't be too hard to
>> >> pick a meaningful value
>> >>
>> >> On Wed, Jan 11, 2017 at 3:58 PM, Rajini Sivaram <
>> rajinisiva...@gmail.com>
>> >> wrote:
>> >> > Mickael,
>> >> >
>> >> > I had based the comment on KIP-72 description where brokers were
>> muting
>> >> all
>> >> > client channels once memory pool was empty. Having reviewed the PR
>> >> today, I
>> >> > think it may be fine to delay muting and allocate small buffers
>> outside
>> >> of
>> >> > the pool. I would still not want to have a config parameter to decide
>> >> what
>> >> > "small" means, a well chosen hard limit would suffice.
>> >> >
>> >> > On Wed, Jan 11, 2017 at 3:05 PM, Mickael Maison <
>> >> mickael.mai...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> Rajini,
>> >> >>
>> >> >> Why do you think we don't want to do the same for brokers ?
>> >> >> It feels like brokers would be affected the same way and could end up
>> >> >> delaying group/hearbeat requests.
>> >> >>
>> >> >> Also given queued.max.requests it seems unlikely that small requests
>> >> >> (<<1Kb) being allocated outside of the memory pool would cause OOM
>> >> >> exceptions
>> >> >>
>> >> >>
>> >> >> On Wed, Dec 14, 2016 at 12:29 PM, Rajini Sivaram <
>> rsiva...@pivotal.io>
>> >> >> wrote:
>> >> >> > Edo,
>> >> >> >
>> >> >> > I wouldn't introduce a new config entry, especially since you don't
>> >> need
>> >> >> it
>> >> >> > after KAFKA-4137. As a temporary measure that would work for
>> >> consumers.
>> >> >> But
>> >> >> > you probably don't want to do the same for brokers - will be worth
>> >> >> checking
>> >> >> > with Radai since the implementation will be based on KIP-72. To do
>> >> this
>> >> >> > only for consumers, you will need some conditions in the common
>> >> network
>> >> >> > code while allocating and releasing buffers. A bit messy, but
>> doable.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar > >
>> >> >> wrote:
>> >> >> >
>> >> >> >> Thanks Rajini,
>> >> >> >> Before Kafka-4137, we could avoid coordinator starvation without
>> >> making
>> >> >> a
>> >> >> >> special case for a special connection,
>> >> >> >> but rather simply, in applying the buffer.memory check only to
>> >> 'large'
>> >> >> >> responses
>> >> >> >> (e.g.  size > 1k, possibly introducing a new config entry) in
>> >> >> >>
>> >> >> >> NetworkReceive.readFromReadableChannel(ReadableByteChannel)
>> >> >> >>
>> >> >> >> Essentially this would limit reading fetch responses but allow for
>> >> other
>> >> >> >> responses to be processed.
>> >> >> >>
>> >> >> >> This is a sample of sizes for responses I collected :
>> >> >> >>
>> >> >> >> * size=108 APIKEY=3 METADATA
>> >> >> >> *  size=28 APIKEY=10 GROUP_COORDINATOR
>> >> >> >> *  size=193 APIKEY=11 JOIN_GROUP
>> >> >> >> *  size=39 APIKEY=14 SYNC_GROUP
>> >> >> >> *  size=39 APIKEY=9 OFFSET_FETCH
>> >> >> >> *  size=45 APIKEY=2 LIST_OFFSETS
>> >> >> >> *  size=88926 APIKEY=1 FETCH
>> >> >> >> *  size=45 APIKEY=1 FETCH
>> >> >> >> *  size=6 APIKEY=12 HEARTBEAT
>> >> >> >> *  size=45 APIKEY=1 FETCH
>> >> >> >> *  size=45 APIKEY=1 FETCH
>> >> >> >> *  size=45 APIKEY=1 FETCH
>> >> >> >> *  

[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897371#comment-15897371
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Hi [~enothereska],
I have to disagree. It is perfectly clear to me (from the documentation) that 
punctuate is based on event time, not system time. However, the problem is 
event time is not advanced reliably, since a single input stream that doesn't 
receive messages will cause the event time to not be advanced. In an extreme 
case of a poorly partitioned topic, I can imagine some partition may never get 
a message. That would cause a topology that has that partition as input to not 
advance event time ever, hence not fire punctuate ever, regardless of the 
presence of messages on its other input topics. In my opinion, if the purpose 
of punctuate is to perform periodic operations, then this flaw makes it unfit 
for that purpose. 

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-06 Thread Ismael Juma
Thanks Colin. It seems like you replied to me accidentally instead of the
list, so leaving your reply below for the benefit of others.

Regarding the disadvantage of having to hunt through the request class,
don't people have to do that anyway with the Options classes?

Aside from that, it would be great if the KIP included more detailed
javadoc for each method including information about potential exceptions.
I'm particularly interested in what a user can expect if a create topics
succeeds versus what they can expect if a timeout exception is thrown. It
would be good to consider partial failures as well.

Ismael

On Fri, Mar 3, 2017 at 9:37 PM, Colin McCabe  wrote:

> On Fri, Mar 3, 2017, at 06:41, Ismael Juma wrote:
> > Hi Colin,
> >
> > I still need to do a detailed review, but I have a couple of
> > comments/questions:
> >
> > 1. I am not sure about having the options/response classes as inner
> > classes
> > of the interface. It means that file containing the interface will be
> > huge
> > eventually. And the classes are not necessarily related either. Why not
> > use
> > a separate package for them?
>
> Yeah, I think it's reasonable to make these top-level classes and put
> them in separate files.  We can put them all in
> org.apache.kafka.clients.admin.
>
> >
> > 2. Can you elaborate on how one decides one goes in the Options class
> > versus the first parameter?
>
> I guess I think of options as things that you don't have to set.  For
> example, when deleting a topic, you must supply the topic name, but
> supplying a non-default timeout is optional.
>
> > I wonder if it would be simpler to just have a
> > single parameter. In that case it should probably be called a Request as
> > Radai suggested, but that's a separate point and we can discuss it
> > separately.
>
> Hmm.  I don't think it would be simpler for users.  It would force
> people who just want to do something simple like delete a topic or get
> the api version of a single node to go hunting through the request
> class.
>
> best,
> Colin
>
>
> >
> > Ismael
> >
> > On Thu, Mar 2, 2017 at 1:58 AM, Colin McCabe  wrote:
> >
> > > On Wed, Mar 1, 2017, at 15:52, radai wrote:
> > > > quick comment on the request objects:
> > > >
> > > > i see "abstract class NewTopic" and "class NewTopicWithReplication"
> and "
> > > > NewTopicWithReplicaAssignments"
> > > >
> > > > 1. since the result object is called CreateTopicResults should these
> be
> > > > called *Request?
> > >
> > > Hi radai,
> > >
> > > Thanks for taking a look.
> > >
> > > I think using the name "request" would be very confusing here, because
> > > we have a whole family of internal Request classes such as
> > > CreateTopicsRequest, TopicMetataRequest, etc. which are used for RPCs.
> > >
> > > > 2. this seems like a suboptimal approach to me. imagine we add a
> > > > NewTopicWithSecurity, and then we would need
> > > > NewTopicWithReplicationAndSecurity? (or any composable "traits").
> > > > this wont really scale. Wouldnt it be better to have a single (rather
> > > complicated)
> > > > CreateTopicRequest, and use a builder pattern to deal with the
> compexity
> > > > and options? like so:
> > > >
> > > > CreateTopicRequest req =
> > > > AdminRequests.newTopic("bob").replicationFactor(2).
> > > withPartitionAssignment(1,
> > > > "boker7", "broker10").withOption(...).build();
> > > >
> > > > the builder would validate any potentially conflicting options and
> would
> > > > allow piling on the complexity in a more manageable way (note - my
> code
> > > > above intends to demonstrate both a general replication factor and a
> > > > specific assignment for a partiocular partition of that topic, which
> may
> > > > be
> > > > too much freedom).
> > >
> > > We don't need to express every optional bell and whistle by creating a
> > > subclass.  In fact, the proposal already had setConfigs() in the base
> > > class, since it applies to every new topic creation.
> > >
> > > Thinking about it a little more, though, the subclasses don't really
> add
> > > that much value, so we should probably just have NewTopic and no
> > > subclasses.  I removed the subclasses.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > On Wed, Mar 1, 2017 at 1:58 PM, Colin McCabe 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for commenting, everyone.  Does anyone have more questions
> or
> > > > > comments, or should we vote?  The latest proposal is up at
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 117%3A+Add+a+public+
> > > > > AdminClient+API+for+Kafka+admin+operations
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Thu, Feb 16, 2017, at 15:00, Colin McCabe wrote:
> > > > > > On Thu, Feb 16, 2017, at 14:11, Dong Lin wrote:
> > > > > > > Hey Colin,
> > > > > > >
> > > > > > > Thanks for the update. I have two comments:
> > > > > > >
> > > > > > > - I actually think it is simpler and good 

[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897290#comment-15897290
 ] 

Eno Thereska commented on KAFKA-3514:
-

[~mihbor] 2) above is slightly different from the problem you are describing 
(it seems to me). I think the issue you are describing is that punctuate is 
based on event time, not system time.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3514 started by Eno Thereska.
---
> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3514:
---

Assignee: Eno Thereska

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4266) Replication Quota Tests: Ensure ZK updated before tests start

2017-03-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4266:
---
Fix Version/s: 0.11.0.0

> Replication Quota Tests: Ensure ZK updated before tests start
> -
>
> Key: KAFKA-4266
> URL: https://issues.apache.org/jira/browse/KAFKA-4266
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Stopford
>Assignee: Ben Stopford
> Fix For: 0.11.0.0
>
>
> Alter temporal test in ReassignPartitionsClusterTest so that it doesn't fail 
> if ZK is not updated before throttle starts. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4266) Replication Quota Tests: Ensure ZK updated before tests start

2017-03-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4266.

Resolution: Fixed

> Replication Quota Tests: Ensure ZK updated before tests start
> -
>
> Key: KAFKA-4266
> URL: https://issues.apache.org/jira/browse/KAFKA-4266
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Stopford
>Assignee: Ben Stopford
> Fix For: 0.11.0.0
>
>
> Alter temporal test in ReassignPartitionsClusterTest so that it doesn't fail 
> if ZK is not updated before throttle starts. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4266) Replication Quota Tests: Ensure ZK updated before tests start

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897270#comment-15897270
 ] 

ASF GitHub Bot commented on KAFKA-4266:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1997


> Replication Quota Tests: Ensure ZK updated before tests start
> -
>
> Key: KAFKA-4266
> URL: https://issues.apache.org/jira/browse/KAFKA-4266
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Stopford
>Assignee: Ben Stopford
>
> Alter temporal test in ReassignPartitionsClusterTest so that it doesn't fail 
> if ZK is not updated before throttle starts. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3989) Add JMH module for Benchmarks

2017-03-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3989:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Add JMH module for Benchmarks
> -
>
> Key: KAFKA-3989
> URL: https://issues.apache.org/jira/browse/KAFKA-3989
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.11.0.0
>
>
> JMH is a Java harness for building, running, and analyzing benchmarks written 
> in Java or JVM languages.  To run properly JMH needs to be in it's own 
> module.   This task will also investigate using the jmh -gradle pluging 
> [https://github.com/melix/jmh-gradle-plugin] which enables the use of JMH 
> from gradle.  This is related to 
> [https://issues.apache.org/jira/browse/KAFKA-3973]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   >