[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/6/17 5:16 AM:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data. 



was (Author: yogeshbelur):
Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075945#comment-16075945
 ] 

Yogesh BG commented on KAFKA-5545:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Updated] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-07-05 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4601:
-
Description: 
Consider the following DSL:

{code}
Stream source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1");
Stream mapped = source.map(..);

KTable counts = mapped
.groupByKey()
.count("Counts");

KStream sink = mapped.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [topic1]
children:   [KSTREAM-MAP-01]
KSTREAM-MAP-01:
children:   
[KSTREAM-FILTER-04, KSTREAM-FILTER-07]
KSTREAM-FILTER-04:
children:   
[KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  X-Counts-repartition
KSTREAM-FILTER-07:
children:   
[KSTREAM-SINK-06]
KSTREAM-SINK-06:
topic:  
X-KSTREAM-MAP-01-repartition

ProcessorTopology:
KSTREAM-SOURCE-08:
topics: 
[X-KSTREAM-MAP-01-repartition]
children:   
[KSTREAM-LEFTJOIN-09]
KSTREAM-LEFTJOIN-09:
states: [Counts]
KSTREAM-SOURCE-05:
topics: [X-Counts-repartition]
children:   
[KSTREAM-AGGREGATE-02]
KSTREAM-AGGREGATE-02:
states: [Counts]
{code}

I.e. there are two repartition topics, one for the aggregate and one for the 
join, which not only introduce unnecessary overheads but also mess up the 
processing ordering (users are expecting each record to go through aggregation 
first then the join operator). And in order to get the following simpler 
topology users today need to add a {{through}} operator after {{map}} manually 
to enforce repartitioning.

{code}
Stream source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1");
Stream repartitioned = source.map(..).through("topic2");

KTable counts = repartitioned
.groupByKey()
.count("Counts");

KStream sink = repartitioned.leftJoin(counts, ..);
{code}

The resulted topology then will look like this:

{code}
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [topic1]
children:   [KSTREAM-MAP-01]
KSTREAM-MAP-01:
children:   
[KSTREAM-SINK-02]
KSTREAM-SINK-02:
topic:  topic 2

ProcessorTopology:
KSTREAM-SOURCE-03:
topics: [topic 2]
children:   
[KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
KSTREAM-AGGREGATE-04:
states: [Counts]
KSTREAM-LEFTJOIN-05:
states: [Counts]
{code} 

This kind of optimization should be automatic in Streams, which we can consider 
doing when extending from one-operator-at-a-time translation.

  was:
Consider the following DSL:

{code}
Stream source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1").map(..);

KTable counts = source
.groupByKey()
.count("Counts");

KStream sink = source.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [topic1]
children:   [KSTREAM-MAP-01]
KSTREAM-MAP-01:
 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075541#comment-16075541
 ] 

Guozhang Wang edited comment on KAFKA-5545 at 7/5/17 10:34 PM:
---

[~yogeshbelur] In your code snippet it seems you did not ever close the 
instance before creating the new instance and then call {{cleanUp}}, or are the 
{{close()}} and {{start()}} calls for the previous instance (it is hard to tell 
how {{setupDiscovery}} is triggered)? 

{code}
close();
streams = new KafkaStreams(buildTopology(config), config);
logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString 
+ "].");
streams.cleanUp();
start();
{code}

Anyways, if {{Streams.close()}} is indeed called, then the producer will be 
closed in that function and the inner {{Sender}} thread will be terminated and 
not try to connect to the broker anymore. But from your attached logs it does 
seems the thread was notified to shutdown but never existed the main loop:

{code}
10:02:33.981 [pool-1-thread-1] INFO  o.apache.kafka.streams.KafkaStreams - 
stream-client [ks_0_inst] State transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed 
thread to shut down
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] Informed 
thread to shut down
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] Informed 
thread to shut down
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 

[jira] [Commented] (KAFKA-3331) Refactor TopicCommand to make it testable and add unit tests

2017-07-05 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075525#comment-16075525
 ] 

Gwen Shapira commented on KAFKA-3331:
-

+1, that would be awesome
Since it is via a normal API, we can also add stuff like auth, use of ACL, etc

> Refactor TopicCommand to make it testable and add unit tests
> 
>
> Key: KAFKA-3331
> URL: https://issues.apache.org/jira/browse/KAFKA-3331
> Project: Kafka
>  Issue Type: Wish
>Affects Versions: 0.9.0.1
>Reporter: Ashish Singh
>Assignee: Ashish Singh
> Fix For: 0.11.1.0
>
>
> TopicCommand has become a functionality packed, hard to read, class. Adding 
> or changing it with confidence requires some unit tests around it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5525) Streams reset tool should have same console output with or without dry-run

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075510#comment-16075510
 ] 

ASF GitHub Bot commented on KAFKA-5525:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3443


> Streams reset tool should have same console output with or without dry-run
> --
>
> Key: KAFKA-5525
> URL: https://issues.apache.org/jira/browse/KAFKA-5525
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> Hi,
> I see that the Streams reset tool provides a console output a little bit 
> different when you execute it using "dry-run" (so without executing any real 
> action) or without it.
> With dry-run :
> {code}
> Dry run displays the actions which will be performed when running Streams 
> Reset Tool
> Following input topics offsets will be reset to beginning (for consumer group 
> streams-wordcount)
> Topic: streams-file-input
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic: streams-wordcount-Counts-repartition
> Topic: streams-wordcount-Counts-changelog
> Done.
> {code}
> without dry-run :
> {code}
> Seek-to-beginning for input topics [streams-file-input]
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic streams-wordcount-Counts-repartition is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Topic streams-wordcount-Counts-changelog is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Done.
> {code}
> I think that the version with dry-run related to show "Seek-to-beginning for 
> input topics [streams-file-input]" could be used even for version without 
> dry-run.
> The output should be consistent and the only difference should be on 
> executing real actions or not.
> I'm working on a trivial PR for a proposal.
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5525) Streams reset tool should have same console output with or without dry-run

2017-07-05 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5525:
-
Component/s: streams

> Streams reset tool should have same console output with or without dry-run
> --
>
> Key: KAFKA-5525
> URL: https://issues.apache.org/jira/browse/KAFKA-5525
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> Hi,
> I see that the Streams reset tool provides a console output a little bit 
> different when you execute it using "dry-run" (so without executing any real 
> action) or without it.
> With dry-run :
> {code}
> Dry run displays the actions which will be performed when running Streams 
> Reset Tool
> Following input topics offsets will be reset to beginning (for consumer group 
> streams-wordcount)
> Topic: streams-file-input
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic: streams-wordcount-Counts-repartition
> Topic: streams-wordcount-Counts-changelog
> Done.
> {code}
> without dry-run :
> {code}
> Seek-to-beginning for input topics [streams-file-input]
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic streams-wordcount-Counts-repartition is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Topic streams-wordcount-Counts-changelog is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Done.
> {code}
> I think that the version with dry-run related to show "Seek-to-beginning for 
> input topics [streams-file-input]" could be used even for version without 
> dry-run.
> The output should be consistent and the only difference should be on 
> executing real actions or not.
> I'm working on a trivial PR for a proposal.
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075191#comment-16075191
 ] 

Matthias J. Sax commented on KAFKA-5528:


Added FAQ: 
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5528.

Resolution: Not A Bug

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5519) Support for multiple certificates in a single keystore

2017-07-05 Thread Alla Tumarkin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075164#comment-16075164
 ] 

Alla Tumarkin commented on KAFKA-5519:
--

I wouldn't call it problematic: I just imagine there are situations where 
multiple J2EE applications may want to use a single keystore and import their 
client certificates into a single keystore - in order to decrease management 
overhead by not having to maintain multiple keystores (like managing keystore 
passwords, for example).

> Support for multiple certificates in a single keystore
> --
>
> Key: KAFKA-5519
> URL: https://issues.apache.org/jira/browse/KAFKA-5519
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Alla Tumarkin
>  Labels: upstream-issue
>
> Background
> Currently, we need to have a keystore exclusive to the component with exactly 
> one key in it. Looking at the JSSE Reference guide, it seems like we would 
> need to introduce our own KeyManager into the SSLContext which selects a 
> configurable key alias name.
> https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509KeyManager.html 
> has methods for dealing with aliases.
> The goal here to use a specific certificate (with proper ACLs set for this 
> client), and not just the first one that matches.
> Looks like it requires a code change to the SSLChannelBuilder



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075061#comment-16075061
 ] 

Yogesh BG commented on KAFKA-5545:
--

I see. But what could be the problems in closing the strems.  I don't see
restarting application a good idea. From log we can see some threads still
polling to connect to old ip. We should try closing those threads right.

One more thing is if I do close with in connction timeout all goes well.
But if I issue close after connection timeout the threads are stuck




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  

[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075056#comment-16075056
 ] 

Matthias J. Sax commented on KAFKA-5070:


I was working on https://issues.apache.org/jira/browse/KAFKA-5167 and hoping 
that it will cover this JIRA as well. Thoughts?

> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> 
>
> Key: KAFKA-5070
> URL: https://issues.apache.org/jira/browse/KAFKA-5070
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux Version
>Reporter: Dhana
>Assignee: Matthias J. Sax
> Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075053#comment-16075053
 ] 

Damian Guy commented on KAFKA-5545:
---

As you suggested, it would see that the first KafkaStreams instance hasn't 
closed successfully as some threads are stuck. You should probably try calling 
{{boolean KafkaStreams.close(timeout, timeunit)}} and check the return value. 
If the result is {{false}} then you should probably terminate the application 
and restart.

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  

[jira] [Commented] (KAFKA-4609) KTable/KTable join followed by groupBy and aggregate/count can result in incorrect results

2017-07-05 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16075005#comment-16075005
 ] 

Damian Guy commented on KAFKA-4609:
---

This was partially fixed by 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+state+stores+and+improved+semantics
If you use one of the join/leftJoin/outerJoin methods that take either a 
{{StateStoreSupplier}} or {{queryableName}} as a param then it works. However, 
for the basic join/leftJoin/outerJoin method it doesn't work. In order to make 
it work properly we need to add another param to these join methods, 
{{joinSerde}}, so that we can construct the state store etc.

This would require a KIP. However as we are currently discussing DSL changes to 
remove overloads I'd recommend we hold until we know which direction we are 
going. 

> KTable/KTable join followed by groupBy and aggregate/count can result in 
> incorrect results
> --
>
> Key: KAFKA-4609
> URL: https://issues.apache.org/jira/browse/KAFKA-4609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>  Labels: architecture
>
> When caching is enabled, KTable/KTable joins can result in duplicate values 
> being emitted. This will occur if there were updates to the same key in both 
> tables. Each table is flushed independently, and each table will trigger the 
> join, so you get two results for the same key. 
> If we subsequently perform a groupBy and then aggregate operation we will now 
> process these duplicates resulting in incorrect aggregated values. For 
> example count will be double the value it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4249) Document how to customize GC logging options for broker

2017-07-05 Thread Evgeny Veretennikov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgeny Veretennikov reassigned KAFKA-4249:
--

Assignee: Tom Bentley

> Document how to customize GC logging options for broker
> ---
>
> Key: KAFKA-4249
> URL: https://issues.apache.org/jira/browse/KAFKA-4249
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Jim Hoagland
>Assignee: Tom Bentley
>
> We wanted to enable GC logging for Kafka broker and saw that you can set 
> GC_LOG_ENABLED=true.  However, this didn't do what we wanted.  For example, 
> the GC log will be overwritten every time the broker gets restarted.  It 
> wasn't clear how we could do that (no documentation of it that I can find), 
> so I did some research by looking at the source code and did some testing and 
> found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to 
> starting broker.  I posted my solution to StackOverflow:
>   
> http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove
> (feel free to critique)
> That solution is now public, but it seems like the Kafka documentation should 
> say how to do this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 1:04 PM:
---

*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

*Current WIP commits can be seen here:*
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637

*Note:* Segment granularity currently defaults to 1Minute and as a result works 
with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - 
then using a unix-long (numeric) derivative it would be possible where you roll 
the appropriate boundary through a floor configured function.



was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637

Note: Segment granularity currently defaults to 1Minute and as a result works 
with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - 
then using a unix-long (numeric) derivative it would be possible where you roll 
the appropriate boundary through a floor configured function.


> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: 

[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 1:04 PM:
---

*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637

Note: Segment granularity currently defaults to 1Minute and as a result works 
with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - 
then using a unix-long (numeric) derivative it would be possible where you roll 
the appropriate boundary through a floor configured function.



was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637




> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating the segment id 

[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2017-07-05 Thread Evgeny Veretennikov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074714#comment-16074714
 ] 

Evgeny Veretennikov commented on KAFKA-5503:


[~hachikuji], part of call stack for blocking idempotent producer is:

{noformat}
Selector.poll(long): L321
NetworkClient.poll(long, long): L433
NetworkClientUtils.sendAndReceive(KafkaClient, ClientRequest, Time): L89
Sender.sendAndAwaitInitProducerIdRequest(): L405
Sender.maybeWaitForProducerId(): L419
Sender.run(long): L204
{noformat}

So, selector blocks thread. While we invoke {{initiateClose()}} method, part of 
call stack is:

{noformat}
Selector.wakeUp(): 240
NetworkClient.wakeUp(): L498
Sender.wakeup(): L675
Sender.initiateClose(): L390
{noformat}

So, that seems, that blocking selector will actually be waken up and thus 
sender thread won't be blocked right after concurrent {{initiateClose()}} call.

So, is this issue still relevant? Do I miss something?

> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
> Fix For: 0.11.0.1
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5255) Auto generate request/response classes

2017-07-05 Thread Tom Bentley (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-5255:
--

Assignee: Tom Bentley

> Auto generate request/response classes
> --
>
> Key: KAFKA-5255
> URL: https://issues.apache.org/jira/browse/KAFKA-5255
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ismael Juma
>Assignee: Tom Bentley
> Fix For: 0.11.1.0
>
>
> We should automatically generate the request/response classes from the 
> protocol definition. This is a major source of boilerplate, development 
> effort and inconsistency at the moment. If we auto-generate the classes, we 
> may also be able to avoid the intermediate `Struct` representation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074573#comment-16074573
 ] 

ASF GitHub Bot commented on KAFKA-5556:
---

GitHub user umesh9794 opened a pull request:

https://github.com/apache/kafka/pull/3489

KAFKA-5556 : KafkaConsumer.commitSync throws IllegalStateException: A…

This PR makes `commitOffsetsSync` method check whether future is completed 
after client's poll or not. 

Tests: All existing tests especially 
"`testCommitOffsetSyncCallbackWithNonRetriableException`" is passed. Not sure 
if we need to add any dedicated tests for this minor change. 

Awaiting your review comments. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/umesh9794/kafka KAFKA-5556

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3489


commit 1d44ca726026aea7bb030d5eecb5d4a197b5b0b9
Author: umesh chaudhary 
Date:   2017-07-05T10:45:59Z

KAFKA-5556 : KafkaConsumer.commitSync throws IllegalStateException: Attempt 
to retrieve exception from future which hasn't failed




> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu fangbo resolved KAFKA-5558.
---
Resolution: Fixed

this is not a bug

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074567#comment-16074567
 ] 

zhu fangbo commented on KAFKA-5558:
---

Rajini Sivaram ,
 Thanks for your advise, it really helps.I think I shoud read the code of 
kafkaServer carefully .

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074537#comment-16074537
 ] 

Rajini Sivaram commented on KAFKA-5558:
---

The error indicates PLAINTEXT producer has connected successfully to a broker. 
So this is likely to be an authorization issue. Have you granted access to the 
topic for the PLAINTEXT user? The user name is ANONYMOUS.

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu fangbo updated KAFKA-5558:
--
Issue Type: Bug  (was: New Feature)

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)
zhu fangbo created KAFKA-5558:
-

 Summary: can not connect to the unsecure port after config 
SASL/PLAIN
 Key: KAFKA-5558
 URL: https://issues.apache.org/jira/browse/KAFKA-5558
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Affects Versions: 0.10.1.1
Reporter: zhu fangbo


Dear All, 
I followed modifying sasl mechanism in a running cluster to set a cluster with 
one broker using SASL/PLAIN to authorize client. here are configurations:
server config
server.properties:
listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
My producer configured with security.protocol=SASL_PLAINTEXT and correct 
jass.conf can work well when I connect to the secure port(9094),but when I use 
the default security.protocol and connect to unsecure port(9093), the producer 
can not fetch metadata:
DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
Sending metadata request {topics=[test]} to node -1
WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Error 
while fetching metadata with correlation id 0 : 
{test=UNKNOWN_TOPIC_OR_PARTITION}
DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
Sending metadata request {topics=[test]} to node 1
WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Error 
while fetching metadata with correlation id 2 : 
{test=UNKNOWN_TOPIC_OR_PARTITION}
Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Fix Version/s: (was: 0.10.2.1)
   0.10.2.2

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Summary: KafkaConsumer.commitSync throws IllegalStateException: Attempt to 
retrieve exception from future which hasn't failed  (was: KafkaConsumer throws: 
java.lang.IllegalStateException: > Attempt to retrieve exception from future 
which hasn't failed)

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-5556:
--

Assignee: Umesh Chaudhary

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Priority: Critical  (was: Major)

> KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve 
> exception from future which hasn't failed
> --
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Priority: Critical
> Fix For: 0.10.2.1, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Fix Version/s: 0.10.2.1
   0.11.0.1

> KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve 
> exception from future which hasn't failed
> --
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
> Fix For: 0.10.2.1, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor

2017-07-05 Thread Paolo Patierno (JIRA)
Paolo Patierno created KAFKA-5557:
-

 Summary: Using a logPrefix inside the StreamPartitionAssignor
 Key: KAFKA-5557
 URL: https://issues.apache.org/jira/browse/KAFKA-5557
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Paolo Patierno
Assignee: Paolo Patierno
Priority: Trivial


Hi,
the "stream-thread [%s]" is replicated more times in all the logging messages 
inside the StreamPartitionAssignor. Using a logPrefix like for the StreamThread 
class could be better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Umesh Chaudhary (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074389#comment-16074389
 ] 

Umesh Chaudhary commented on KAFKA-5556:


[~damianguy], I can work on this. 

> KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve 
> exception from future which hasn't failed
> --
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)