[jira] [Updated] (KAFKA-7118) Make KafkaConsumer compatible with multiple threads

2018-07-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7118:
--
Description: It was discovered that there is a performance constraint 
relating to {{KafkaConsumer}}. When several cores are modifying the same 
consumer, a {{ConcurrentModification}} exception could result. This issue was 
first posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. It 
would be preferable if multiple threads are allowed to modify one 
{{KafkaConsumer}}.  (was: It was discovered that there is a performance 
constraint relating to {{KafkaConsumer#close()}}. When several cores are 
modifying the same consumer, a {{ConcurrentModification}} exception could 
result. This issue was first posted on Spark. For testing details, see 
SPARK-23636 and SPARK-19185. It would be preferable if multiple threads are 
allowed to modify one {{KafkaConsumer}}.)

> Make KafkaConsumer compatible with multiple threads
> ---
>
> Key: KAFKA-7118
> URL: https://issues.apache.org/jira/browse/KAFKA-7118
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: client, java, kip, needs-kip
>
> It was discovered that there is a performance constraint relating to 
> {{KafkaConsumer}}. When several cores are modifying the same consumer, a 
> {{ConcurrentModification}} exception could result. This issue was first 
> posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. It 
> would be preferable if multiple threads are allowed to modify one 
> {{KafkaConsumer}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2018-07-02 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251
 ] 

Ted Yu edited comment on KAFKA-6303 at 7/3/18 5:38 AM:
---

+1 from me


was (Author: yuzhih...@gmail.com):
+1

> Potential lack of synchronization in NioEchoServer#AcceptorThread
> -
>
> Key: KAFKA-6303
> URL: https://issues.apache.org/jira/browse/KAFKA-6303
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In the run() method:
> {code}
> SocketChannel socketChannel = 
> ((ServerSocketChannel) key.channel()).accept();
> socketChannel.configureBlocking(false);
> newChannels.add(socketChannel);
> {code}
> Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times

2018-07-02 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530850#comment-16530850
 ] 

Richard Yu commented on KAFKA-6687:
---

Oh, in that case, I could see what you mean. Thanks for the clarifications. :) 
I couldn't really tell when DSL was being referred to and when runtime was 
involved.

 

> Allow to read a topic multiple times
> 
>
> Key: KAFKA-6687
> URL: https://issues.apache.org/jira/browse/KAFKA-6687
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users often want to read topic multiple times. However, this is not possible 
> because there is a single consumer and thus a topic can only be consumed once.
> Users get an exception
> {quote}Exception in thread “main” 
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> source has already been registered by another source.
> {quote}
> If they use a topic name in multiple `stream()`, `table()`, `globalTable()` 
> calls.
> However, with KAFKA-6034 in place, we could allow adding a topic multiple 
> times and rewrite the topology internally to only read the topic once. This 
> would simplify application code as users don't need to put workaround in 
> place to get the same behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530827#comment-16530827
 ] 

ASF GitHub Bot commented on KAFKA-7101:
---

guozhangwang closed pull request #5298: KAFKA-7101: Consider session store for 
windowed store default configs
URL: https://github.com/apache/kafka/pull/5298
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ed51754d978..c644f9bbf38 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,8 +139,10 @@ public StateStore build() {
 }
 
 long retentionPeriod() {
-if (isWindowStore()) {
+if (builder instanceof WindowStoreBuilder) {
 return ((WindowStoreBuilder) builder).retentionPeriod();
+} else if (builder instanceof SessionStoreBuilder) {
+return ((SessionStoreBuilder) builder).retentionPeriod();
 } else {
 throw new IllegalStateException("retentionPeriod is not 
supported when not a window store");
 }
@@ -159,7 +162,7 @@ private String name() {
 }
 
 private boolean isWindowStore() {
-return builder instanceof WindowStoreBuilder;
+return builder instanceof WindowStoreBuilder || builder instanceof 
SessionStoreBuilder;
 }
 
 // Apparently Java strips the generics from this method because we're 
using the raw type for builder,
@@ -226,7 +229,7 @@ private SourceNodeFactory(final String name,
   final Deserializer keyDeserializer,
   final Deserializer valDeserializer) {
 super(name, NO_PREDECESSORS);
-this.topics = topics != null ? Arrays.asList(topics) : new 
ArrayList();
+this.topics = topics != null ? Arrays.asList(topics) : new 
ArrayList<>();
 this.pattern = pattern;
 this.keyDeserializer = keyDeserializer;
 this.valDeserializer = valDeserializer;
@@ -316,7 +319,7 @@ public ProcessorNode build() {
 final String topic = ((StaticTopicNameExtractor) 
topicExtractor).topicName;
 if (internalTopicNames.contains(topic)) {
 // prefix the internal topic name with the application id
-return new SinkNode<>(name, new 
StaticTopicNameExtractor(decorateTopic(topic)), keySerializer, 
valSerializer, partitioner);
+return new SinkNode<>(name, new 
StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, 
partitioner);
 } else {
 return new SinkNode<>(name, topicExtractor, keySerializer, 
valSerializer, partitioner);
 }
@@ -415,7 +418,7 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
 throw new TopologyException("Sink " + name + " must have at least 
one parent");
 }
 
-addSink(name, new StaticTopicNameExtractor(topic), 
keySerializer, valSerializer, partitioner, predecessorNames);
+addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, 
valSerializer, partitioner, predecessorNames);
 nodeToSinkTopic.put(name, topic);
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
index 04b0ceb1ecf..69540899ada 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -35,4 +35,11 @@
  * @return segmentInterval in milliseconds
  */
 long segmentIntervalMs();
+
+/**
+ * The time period for which the {@link SessionStore} will retain historic 
data.
+ *
+ * @return retentionPeriod
+ */
+long retentionPeriod();
 }
diff --git 

[jira] [Resolved] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`

2018-07-02 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7101.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> Session Window store should set topic policy `compact,cleanup`
> --
>
> Key: KAFKA-7101
> URL: https://issues.apache.org/jira/browse/KAFKA-7101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.1.0
>
>
> With 
> [KIP-71|https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist]
>  (0.10.1.0) topic config `compact,delete` was introduce to apply to windowed 
> store changelog topics in Kafka Streams. Later (0.10.2.0), session windows 
> got added in 
> [KIP-94|https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows].
>  However, session windows do not use `compact,delete` at the moment. This 
> result is the same issue window stores face before KIP-71. Thus, we should 
> enable `compact,delete` for session window changelog topics, too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times

2018-07-02 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530823#comment-16530823
 ] 

Matthias J. Sax commented on KAFKA-6687:


Btw: there is one special case: reading a topic twice at runtime level might 
make sense, if the topic is read as a KTable and GlobalKTable at the same time 
– because for this case, we use two different consumers. Atm, this scenario is 
not supported either.

> Allow to read a topic multiple times
> 
>
> Key: KAFKA-6687
> URL: https://issues.apache.org/jira/browse/KAFKA-6687
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users often want to read topic multiple times. However, this is not possible 
> because there is a single consumer and thus a topic can only be consumed once.
> Users get an exception
> {quote}Exception in thread “main” 
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> source has already been registered by another source.
> {quote}
> If they use a topic name in multiple `stream()`, `table()`, `globalTable()` 
> calls.
> However, with KAFKA-6034 in place, we could allow adding a topic multiple 
> times and rewrite the topology internally to only read the topic once. This 
> would simplify application code as users don't need to put workaround in 
> place to get the same behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times

2018-07-02 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530822#comment-16530822
 ] 

Matthias J. Sax commented on KAFKA-6687:


You have to distinguish between the DSL level and the runtime level. At runtime 
level, it is not possible (and also does not make sense) to read a topic twice. 
Atm, this restriction is reflected in the DSL. The idea is, to remove this 
restriction from the DSL and rewrite the topology internally. Ie, it's about 
syntactic sugar at DSL level. Atm, users might need to write "ugly" code to get 
the same result.

Note, that this ticket is blocked, until we get the first version of the new 
topology optimizer in place. This is WIP by [~bbejeck].

Does this make sense?

> Allow to read a topic multiple times
> 
>
> Key: KAFKA-6687
> URL: https://issues.apache.org/jira/browse/KAFKA-6687
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users often want to read topic multiple times. However, this is not possible 
> because there is a single consumer and thus a topic can only be consumed once.
> Users get an exception
> {quote}Exception in thread “main” 
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> source has already been registered by another source.
> {quote}
> If they use a topic name in multiple `stream()`, `table()`, `globalTable()` 
> calls.
> However, with KAFKA-6034 in place, we could allow adding a topic multiple 
> times and rewrite the topology internally to only read the topic once. This 
> would simplify application code as users don't need to put workaround in 
> place to get the same behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-02 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530724#comment-16530724
 ] 

Ted Yu commented on KAFKA-5037:
---

Originally I threw exception in the if block for the following line in 
{{assign}}:
{code}
if (numPartitionsCandidate == null) {
{code}
In patch v2, I construct assignment map. See if this is close to what you 
propose.

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++, user-experience
> Attachments: 5037.v2.txt
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-02 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5037:
--
Attachment: (was: 5037.v1.txt)

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++, user-experience
> Attachments: 5037.v2.txt
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-02 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5037:
--
Attachment: 5037.v2.txt

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++, user-experience
> Attachments: 5037.v2.txt
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times

2018-07-02 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530668#comment-16530668
 ] 

Richard Yu commented on KAFKA-6687:
---

It appears that the summary and the description is somewhat contradictory: 
while the summary states that we should "allow a topic to be read multiple 
times", in the description it states that "we should allow adding a topic 
multiple times and rewrite the topology internally so that it can be read 
*only* once." Which one is it?

> Allow to read a topic multiple times
> 
>
> Key: KAFKA-6687
> URL: https://issues.apache.org/jira/browse/KAFKA-6687
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users often want to read topic multiple times. However, this is not possible 
> because there is a single consumer and thus a topic can only be consumed once.
> Users get an exception
> {quote}Exception in thread “main” 
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic 
> source has already been registered by another source.
> {quote}
> If they use a topic name in multiple `stream()`, `table()`, `globalTable()` 
> calls.
> However, with KAFKA-6034 in place, we could allow adding a topic multiple 
> times and rewrite the topology internally to only read the topic once. This 
> would simplify application code as users don't need to put workaround in 
> place to get the same behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-02 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5037:
--
Attachment: 5037.v1.txt

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++, user-experience
> Attachments: 5037.v1.txt
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4999) Add convenience overload for seek* methods

2018-07-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-4999:
-

Assignee: (was: Richard Yu)

> Add convenience overload for seek* methods
> --
>
> Key: KAFKA-4999
> URL: https://issues.apache.org/jira/browse/KAFKA-4999
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Timo Meijer
>Priority: Major
>  Labels: Quickfix, needs-kip
>
> The most common use case when using the seek* methods is to work on the 
> currently assigned partitions. This behavior is supported by passing an empty 
> list, but this is not very intuitive.
> Adding an overloaded method for all seek* methods without parameters that has 
> the same behavior; using the currently assigned partitions, would improve the 
> API and user experience.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2018-07-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-6049:
-

Assignee: (was: Richard Yu)

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api, kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (e.g. 
> a shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer), it is very difficult to 
> accommodate this in the Kafka-Streams DSL: it generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outer join 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
> * Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where the 
> join processor keep calling ValueGetters until we have accessed all state 
> stores.
> * Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-02 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-7128:
--

Assignee: Jason Gustafson

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data. 
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> ISR: (r1, r2)
> Leader: r1
> r1: [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> ISR: (r2)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> ISR: (r2, r3)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> ISR: (r3)
> Leader: r3
> r1 (offline): [hw=10, leo=10]
> r2 (offline): [hw=5, leo=10]
> r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch. 
> Note this is related to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-7128:
-

Assignee: (was: Richard Yu)

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data. 
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> ISR: (r1, r2)
> Leader: r1
> r1: [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> ISR: (r2)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> ISR: (r2, r3)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> ISR: (r3)
> Leader: r3
> r1 (offline): [hw=10, leo=10]
> r2 (offline): [hw=5, leo=10]
> r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch. 
> Note this is related to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-7128:
-

Assignee: Richard Yu  (was: Jason Gustafson)

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Richard Yu
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data. 
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> ISR: (r1, r2)
> Leader: r1
> r1: [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> ISR: (r2)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> ISR: (r2, r3)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> ISR: (r3)
> Leader: r3
> r1 (offline): [hw=10, leo=10]
> r2 (offline): [hw=5, leo=10]
> r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch. 
> Note this is related to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530617#comment-16530617
 ] 

Guozhang Wang commented on KAFKA-5037:
--

I'm adding the tag `newbie++` for anyone wanting to pick it up. Note the 
proposed solution is targeted to fix all three JIRAs: 5037, 6587, 6437.

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++, user-experience
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-02 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5037:
-
Labels: newbie++ user-experience  (was: user-experience)

> Infinite loop if all input topics are unknown at startup
> 
>
> Key: KAFKA-5037
> URL: https://issues.apache.org/jira/browse/KAFKA-5037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie++, user-experience
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530555#comment-16530555
 ] 

ASF GitHub Bot commented on KAFKA-7080:
---

guozhangwang closed pull request #5257: KAFKA-7080: replace numSegments with 
segmentInterval
URL: https://github.com/apache/kafka/pull/5257
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index aa3dec17239..fc1fb9f5d13 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -66,11 +66,11 @@
 public final class SessionWindows {
 
 private final long gapMs;
-private long maintainDurationMs;
+private final long maintainDurationMs;
 
-private SessionWindows(final long gapMs) {
+private SessionWindows(final long gapMs, final long maintainDurationMs) {
 this.gapMs = gapMs;
-maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
+this.maintainDurationMs = maintainDurationMs;
 }
 
 /**
@@ -85,7 +85,8 @@ public static SessionWindows with(final long inactivityGapMs) 
{
 if (inactivityGapMs <= 0) {
 throw new IllegalArgumentException("Gap time (inactivityGapMs) 
cannot be zero or negative.");
 }
-return new SessionWindows(inactivityGapMs);
+final long oneDayMs = 24 * 60 * 60_000L;
+return new SessionWindows(inactivityGapMs, oneDayMs);
 }
 
 /**
@@ -99,9 +100,8 @@ public SessionWindows until(final long durationMs) throws 
IllegalArgumentExcepti
 if (durationMs < gapMs) {
 throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be smaller than window gap.");
 }
-maintainDurationMs = durationMs;
 
-return this;
+return new SessionWindows(gapMs, durationMs);
 }
 
 /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 09fdfce948d..53ead1e9b73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -36,18 +36,10 @@
  */
 public abstract class Windows {
 
-private static final int DEFAULT_NUM_SEGMENTS = 3;
+private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
+@Deprecated public int segments = 3;
 
-static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // 
one day
-
-private long maintainDurationMs;
-
-public int segments;
-
-protected Windows() {
-segments = DEFAULT_NUM_SEGMENTS;
-maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS;
-}
+protected Windows() {}
 
 /**
  * Set the window maintain duration (retention time) in milliseconds.
@@ -76,6 +68,19 @@ public long maintainMs() {
 return maintainDurationMs;
 }
 
+/**
+ * Return the segment interval in milliseconds.
+ *
+ * @return the segment interval
+ */
+@SuppressWarnings("deprecation") // The deprecation is on the public 
visibility of segments. We intend to make the field private later.
+public long segmentInterval() {
+// Pinned arbitrarily to a minimum of 60 seconds. Profiling may 
indicate a different value is more efficient.
+final long minimumSegmentInterval = 60_000L;
+// Scaled to the (possibly overridden) retention period
+return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
+}
+
 /**
  * Set the number of segments to be used for rolling the window store.
  * This function is not exposed to users but can be called by developers 
that extend this class.
@@ -83,7 +88,9 @@ public long maintainMs() {
  * @param segments the number of segments to be used
  * @return itself
  * @throws IllegalArgumentException if specified segments is small than 2
+ * @deprecated since 2.1 Override segmentInterval() instead.
  */
+@Deprecated
 protected Windows segments(final int segments) throws 
IllegalArgumentException {
 if (segments < 2) {
 throw new IllegalArgumentException("Number of segments must be at 
least 2.");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bc56a3d0350..acfdf35b96a 100644
--- 

[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530530#comment-16530530
 ] 

Guozhang Wang commented on KAFKA-7125:
--

[~Ahto] I have a PR ready to lift the assumption linked above, you are free to 
try it out. BUT keep in mind that adding a sink node into the sub-topology for 
global store update would not usually work, since every instance of your 
streams application would have a independent global store update task, which 
means that multiple of those tasks may be writing to the same topic. Assume you 
have N instances, then each of the N instances will read from the global source 
topic, do the "UnneededCruftSupplier" processing, and then writing to the sink 
topic "fst-routes-to-services", which means that "fst-routes-to-services" will 
have N duplication factor.

So it brings to me another potential bug that we should fix: for global store's 
sub topology, we should disallow 1) adding a local state store to it, 2) adding 
a sink node to it, and 3) claiming any of its processor nodes as parent of the 
processor nodes of the other normal processing sub-topology. We should consider 
doing this check as well. [~NIzhikov] If you are interested to fix this issue, 
please feel free to take a look at my PR and then start working on this one I 
mentioned.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-07-02 Thread Yishun Guan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530469#comment-16530469
 ] 

Yishun Guan commented on KAFKA-6788:


Does it have a retry mechanism now? I am looking at KAKFA-6789 
(https://issues.apache.org/jira/browse/KAFKA-6789) where it talks about a 
possible implementation of a retry mechanism.

> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-07-02 Thread Colin P. McCabe (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530449#comment-16530449
 ] 

Colin P. McCabe commented on KAFKA-6788:


This is an optimization which we'd like to do at some point, and which hasn't 
been done yet.  Basically, the optimization is that if you have a bunch of 
groups in the batch request which all share a common group coordinator, we'd 
like to send one RPC to that group coordinator rather than several.

I think this would be a hard optimization to do correctly because of the error 
handling issues.  If you get an error for some, but not all, elements of the 
batch, you want to retry just those elements.

The current closed PR looks like it got some wires crossed.  It seems to be 
replacing DescribeGroups with ListGroups, which isn't what we want here.  
Listing all the groups is not very efficient.

> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Jouni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530386#comment-16530386
 ] 

Jouni commented on KAFKA-7125:
--

Thanks Guozhang. I will try it when the PR is ready. Although I must first set 
up my development environment so that I can actually compile Kafka... and make 
sure that it works also with Spring framework. It's late night here where I 
live, and there's need for some sleep after several hours of coding and having 
a "few" beers later, so don't expect a quick answer.

I have also found a workaround for my case. It involves a few custom 
partitioners on the producer side so correlated data will end up on the right 
place. I used a globaltable just because my joins need a keyvaluemapper, the 
lookup key must be constructed from data in another incoming stream. Instead of 
using a globaltable, I can then just subscribe to a stream, do my 
not-streams-related work doing a dummy aggregation resulting to a table, and 
for the joins, just write my own implementations of 
transformersupplier/transformer and get the data from a state store. Oh I wish 
there would be stream-to-table joins with keyvaluemapper!

Haven't yet written the code for my workaround, so I'll test your PR first, 
when it's ready. Probably will end up implementing my workaround later anyway, 
should be faster, and performance will become very important in the future.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-02 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7128:
--

 Summary: Lagging high watermark can lead to committed data loss 
after ISR expansion
 Key: KAFKA-7128
 URL: https://issues.apache.org/jira/browse/KAFKA-7128
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Some model checking exposed a weakness in the ISR expansion logic. We know that 
the high watermark can go backwards after a leader failover, but we may not 
have known that this can lead to the loss of committed data. 

Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of (r1, 
r2) and the leader is r1. r3 is a new replica which has not begun fetching. The 
data up to offset 10 has been committed to the ISR. Here is the initial state:

ISR: (r1, r2)
Leader: r1
r1: [hw=10, leo=10]
r2: [hw=5, leo=10]
r3: [hw=0, leo=0]

Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes r2 
the new leader. The high watermark is still lagging r1.

ISR: (r2)
Leader: r2
r1 (offline): [hw=10, leo=10]
r2: [hw=5, leo=10]
r3: [hw=0, leo=0]

Replica 3 then catch up to the high watermark on r2 and joins the ISR. Perhaps 
it's high watermark is lagging behind r2, but this is unimportant.

ISR: (r2, r3)
Leader: r2
r1 (offline): [hw=10, leo=10]
r2: [hw=5, leo=10]
r3: [hw=0, leo=5]

Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
committed data from offsets 5 to 10 has been lost.

ISR: (r3)
Leader: r3
r1 (offline): [hw=10, leo=10]
r2 (offline): [hw=5, leo=10]
r3: [hw=0, leo=5]

The bug is the fact that we allowed r3 into the ISR after the local high 
watermark had been reached. Since the follower does not know the true high 
watermark for the previous leader's epoch, it should not allow a replica to 
join the ISR until it has caught up to an offset within its own epoch. 

Note this is related to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-07-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530220#comment-16530220
 ] 

Guozhang Wang commented on KAFKA-6788:
--

[~cmccabe] Would like to have you chime into this one, what's the current 
status of this request and how would we like to fix it?

> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530209#comment-16530209
 ] 

Guozhang Wang commented on KAFKA-7125:
--

[~Ahto] Thanks for your updated explanation. And yes your findings is correct: 
today we assume that for global store, there will only be two processor node 
correlated to it, as the source and update processor. This is an unintended 
limitation we added into the topology building process, and I will provide a PR 
trying to lift this limitation. Please feel free to try it out and let me know 
if it resolves your problem. And if yes I will try to merge it into trunk so 
that the next minor release will have this fix.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task

2018-07-02 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530202#comment-16530202
 ] 

Matthias J. Sax commented on KAFKA-6583:


If we close as duplicate, we can resolve immediately. But you are right, it 
should not say "fixed" but "duplicate". Corrected this.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6583) Metadata should include number of state stores for task

2018-07-02 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6583.

Resolution: Duplicate

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6583) Metadata should include number of state stores for task

2018-07-02 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6583:


> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task

2018-07-02 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530087#comment-16530087
 ] 

Ted Yu commented on KAFKA-6583:
---

Shouldn't this be marked Resolved when KAFKA-4696 (Open as of now) is 
integrated ?

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7108) "Exactly-once" stream breaks production exception handler contract

2018-07-02 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530086#comment-16530086
 ] 

Matthias J. Sax commented on KAFKA-7108:


I just double checked the code. We actually allow to override retries (and thus 
we do not log anything). IIRC, for exactly-once, setting retries to MAX_VALUE 
is recommended but not enforced by the producer and thus, Kafka Streams also 
does not enforce it. The question is, why to you set it to 240 and not use the 
default of MAX_VALUE? Also note, you can set different configs for Streams 
itself and for the internal producer/consumer/admin clients if you want (cf. 
https://kafka.apache.org/11/documentation/streams/developer-guide/config-streams.html#id17)

> "Exactly-once" stream breaks production exception handler contract
> --
>
> Key: KAFKA-7108
> URL: https://issues.apache.org/jira/browse/KAFKA-7108
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Major
>  Labels: exactly-once
>
> I have a stream configured with "default.production.exception.handler" that 
> is supposed to log the error and continue. When I set "processing.guarantee" 
> to "exactly_once" it appeared that retryable NotEnoughReplicasException that 
> passed the production exception handler was rethrown by the 
> TransactionManager wrapped with KafkaException and terminated the stream 
> thread:
> _org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error stateat 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
>  [kafka-streams-1.1.0.jar:?]_
>  _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: 
> Messages are rejected since there are fewer in-sync replicas than required._
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6583) Metadata should include number of state stores for task

2018-07-02 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6583.

Resolution: Fixed

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-07-02 Thread Cyrus Vafadari (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530064#comment-16530064
 ] 

Cyrus Vafadari commented on KAFKA-6788:
---

[~guozhang] I abandoned it a while ago when it didn't get attention, so haven't 
actively worked on it in a while. I think if the code is moving towards using 
futures/promise it might be better to start from scratch. If you like this 
strategy though I'll fix merge conflicts and reopen the PR

> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs

2018-07-02 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6859:
---
Fix Version/s: 2.1.0

> Follower should not send OffsetForLeaderEpoch for undefined leader epochs
> -
>
> Key: KAFKA-6859
> URL: https://issues.apache.org/jira/browse/KAFKA-6859
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0
>
>
> This is more of an optimization, rather than correctness.
> Currently, if the follower on inter broker protocol version 0.11 and higher, 
> but on older message format, it does not track leader epochs. However, will 
> still send OffsetForLeaderEpoch request to the leader with undefined epoch 
> which is guaranteed to return undefined offset, so that the follower 
> truncated to high watermark. Another example is a bootstrapping follower that 
> does not have any leader epochs recorded, 
> It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to 
> the follower with undefined leader epochs, since we already know the answer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs

2018-07-02 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-6859:
--

Assignee: Stanislav Kozlovski

> Follower should not send OffsetForLeaderEpoch for undefined leader epochs
> -
>
> Key: KAFKA-6859
> URL: https://issues.apache.org/jira/browse/KAFKA-6859
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Anna Povzner
>Assignee: Stanislav Kozlovski
>Priority: Major
> Fix For: 2.1.0
>
>
> This is more of an optimization, rather than correctness.
> Currently, if the follower on inter broker protocol version 0.11 and higher, 
> but on older message format, it does not track leader epochs. However, will 
> still send OffsetForLeaderEpoch request to the leader with undefined epoch 
> which is guaranteed to return undefined offset, so that the follower 
> truncated to high watermark. Another example is a bootstrapping follower that 
> does not have any leader epochs recorded, 
> It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to 
> the follower with undefined leader epochs, since we already know the answer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529868#comment-16529868
 ] 

ASF GitHub Bot commented on KAFKA-6859:
---

stanislavkozlovski opened a new pull request #5320: KAFKA-6859: Do not send 
LeaderEpochRequest for undefined leader epochs
URL: https://github.com/apache/kafka/pull/5320
 
 
   If a broker or topic has a message format < 0.11, he does not track leader 
epochs. LeaderEpochRequests for such will always return undefined, making the 
follower truncate to the highest watermark. Since there is no use to use the 
network for such cases, don't send a request.
   
   ### Notes
   * I am not sure whether we should even check 
`brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2`, since log 
message format is the only thing we care about and as far as I understand it is 
always defined
   * I am not fond of the subclassing in the tests for mocking the method (and 
also making it protected) but this was the cleanest solution I could come up 
with. Other ideas are welcome
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Follower should not send OffsetForLeaderEpoch for undefined leader epochs
> -
>
> Key: KAFKA-6859
> URL: https://issues.apache.org/jira/browse/KAFKA-6859
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Anna Povzner
>Priority: Major
>
> This is more of an optimization, rather than correctness.
> Currently, if the follower on inter broker protocol version 0.11 and higher, 
> but on older message format, it does not track leader epochs. However, will 
> still send OffsetForLeaderEpoch request to the leader with undefined epoch 
> which is guaranteed to return undefined offset, so that the follower 
> truncated to high watermark. Another example is a bootstrapping follower that 
> does not have any leader epochs recorded, 
> It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to 
> the follower with undefined leader epochs, since we already know the answer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6268) Tools should not swallow exceptions like resolving network names

2018-07-02 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529818#comment-16529818
 ] 

Rajini Sivaram commented on KAFKA-6268:
---

[~astubbs]  Diagnostics for security failures was improved under KIP-152 
(KAFKA-4764 and KAFKA-5920) in 1.0.0 and we should have log entries for those 
at error level. [~enether] Can you verify that we have log entries at warn or 
above if we can't parse socket response? Thanks.

> Tools should not swallow exceptions like resolving network names
> 
>
> Key: KAFKA-6268
> URL: https://issues.apache.org/jira/browse/KAFKA-6268
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Antony Stubbs
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> The cli consumer client shows nothing when it can't resolve a domain. This 
> and other errors like it should be shown to the user by default. You have to 
> turn on DEBUG level logging in the tools log4j to find there is an error.
> {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node 
> as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) 
> (org.apache.kafka.clients.NetworkClient)
> java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764)
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>   at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
>   ... 18 more
> }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6268) Tools should not swallow exceptions like resolving network names

2018-07-02 Thread Antony Stubbs (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529801#comment-16529801
 ] 

Antony Stubbs commented on KAFKA-6268:
--

[~rsivaram] does this fix also cover security related errors like bad keystone 
passwords, can't obtain kerberos ticket, can't parse socket response (connected 
to wrong socket) type errors as well?

> Tools should not swallow exceptions like resolving network names
> 
>
> Key: KAFKA-6268
> URL: https://issues.apache.org/jira/browse/KAFKA-6268
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Antony Stubbs
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> The cli consumer client shows nothing when it can't resolve a domain. This 
> and other errors like it should be shown to the user by default. You have to 
> turn on DEBUG level logging in the tools log4j to find there is an error.
> {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node 
> as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) 
> (org.apache.kafka.clients.NetworkClient)
> java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764)
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>   at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
>   ... 18 more
> }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7108) "Exactly-once" stream breaks production exception handler contract

2018-07-02 Thread Anna O (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529726#comment-16529726
 ] 

Anna O commented on KAFKA-7108:
---

[~mjsax] - here is the StreamsConfig:



StreamsConfig values: 
 a_pplication.id = ..._
 _application.server =_ 
 _bootstrap.servers = [...]_
 _buffered.records.per.partition = 1000_
 _cache.max.bytes.buffering = 10485760_
 _client.id =_ 
 _commit.interval.ms = 100_
 _connections.max.idle.ms = 54_
 _default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler_
 _default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$ByteArraySerde_
 _default.production.exception.handler = class 
com...LogAndContinueProductionExceptionHandler_
 _default.timestamp.extractor = class com...UmsEventTimestampExtractor_
 _default.value.serde = class com...JsonSerde_
 _key.serde = null_
 _metadata.max.age.ms = 30_
 _metric.reporters = []_
 _metrics.num.samples = 2_
 _metrics.recording.level = INFO_
 _metrics.sample.window.ms = 3_
 _num.standby.replicas = 0_
 _num.stream.threads = 8_
 _partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper_
 _poll.ms = 100_
 _processing.guarantee = exactly_once_
 _receive.buffer.bytes = 32768_
 _reconnect.backoff.max.ms = 1000_
 _reconnect.backoff.ms = 50_
 _replication.factor = 3_
 _request.timeout.ms = 4_
 _retries = 240_
 _retry.backoff.ms = 500_
 _rocksdb.config.setter = null_
 _security.protocol = PLAINTEXT_
 _send.buffer.bytes = 131072_
 _state.cleanup.delay.ms = 60_
 _state.dir = /tmp/kafka-streams_
 _timestamp.extractor = null_
 _value.serde = null_
 _windowstore.changelog.additional.retention.ms = 8640_
 _zookeeper.connect =_

 

This is how we override the retries in the KafkaStreams config in the code: __ 

config.put(StreamsConfig.RETRIES_CONFIG, 240);

 

To your remark "_If yes, you should see a WARN log that the overwrite is 
ignored..._" - there is no such log...

 

> "Exactly-once" stream breaks production exception handler contract
> --
>
> Key: KAFKA-7108
> URL: https://issues.apache.org/jira/browse/KAFKA-7108
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Major
>  Labels: exactly-once
>
> I have a stream configured with "default.production.exception.handler" that 
> is supposed to log the error and continue. When I set "processing.guarantee" 
> to "exactly_once" it appeared that retryable NotEnoughReplicasException that 
> passed the production exception handler was rethrown by the 
> TransactionManager wrapped with KafkaException and terminated the stream 
> thread:
> _org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error stateat 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> 

[jira] [Commented] (KAFKA-7120) When Connect throws CONFLICT error for REST requests, it will help to see more details

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529611#comment-16529611
 ] 

ASF GitHub Bot commented on KAFKA-7120:
---

lambdaliu opened a new pull request #5317: KAFKA-7120: Add information to 
indicate which connector resource request cannot be completed
URL: https://github.com/apache/kafka/pull/5317
 
 
   Right now, we throw:
   throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
   "Cannot complete request because of a conflicting operation (e.g. worker 
rebalance)");
   There's no information about WHICH request can't be completed. It will help 
to know.
   This PR add the path, method and body of the request to the message field of 
ConnectRestException to indicate which request can't be completed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> When Connect throws CONFLICT error for REST requests, it will help to see 
> more details
> --
>
> Key: KAFKA-7120
> URL: https://issues.apache.org/jira/browse/KAFKA-7120
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: lambdaliu
>Priority: Critical
>
> Right now, we throw:
> throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
>  "Cannot complete request because of a conflicting operation (e.g. worker 
> rebalance)");
> There's no information about WHICH request can't be completed. It will help 
> to know.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Jouni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529547#comment-16529547
 ] 

Jouni commented on KAFKA-7125:
--

Had a look at Kafka sources, and especially the comment written in 
InternalTopologyBuilder.describeGlobalStore:

// we found a GlobalStore node group; those contain exactly two node: 
\{sourceNode,processorNode}

My code breaks that assumption. Probable explanation: isGlobalSource returns 
true only on the last node, iterator has moved to the end, and  the call to

final String processorNode = nodes.iterator().next(); // get remaining 
processorNode

fails miserably.

Maybe I'll have to find another way to do what I need to do but am not supposed 
to...

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6268) Tools should not swallow exceptions like resolving network names

2018-07-02 Thread Antony Stubbs (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antony Stubbs updated KAFKA-6268:
-
Summary: Tools should not swallow exceptions like resolving network names  
(was: Tools should now swallow exceptions like resolving network names)

> Tools should not swallow exceptions like resolving network names
> 
>
> Key: KAFKA-6268
> URL: https://issues.apache.org/jira/browse/KAFKA-6268
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Antony Stubbs
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> The cli consumer client shows nothing when it can't resolve a domain. This 
> and other errors like it should be shown to the user by default. You have to 
> turn on DEBUG level logging in the tools log4j to find there is an error.
> {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node 
> as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) 
> (org.apache.kafka.clients.NetworkClient)
> java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764)
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>   at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
>   ... 18 more
> }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Jouni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529506#comment-16529506
 ] 

Jouni commented on KAFKA-7125:
--

Sorry, forgot that one relevant call just before getting TopologyDescription... 
bug report updated. And there's a good reason for attaching a downstream 
processor to the statestores processor. I have a need to catch the updates 
Change<> -event, do some totally not-streams-related work to it and forward the 
results again downstream. But my main point is that calling builder.describe() 
again shouldn't throw an exception. Something in handing the internal state of 
Topology clearly isn't totally right.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Jouni (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jouni updated KAFKA-7125:
-
Description: 
After adding a a processor and a sink to topology after a globalstore and then 
calling StreamBuilder.build().describe() again (for debugging purposes and to 
check I got the topology right), had the following exception and stacktrace:

{{Caused by: java.util.NoSuchElementException: null}}
 {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
~[na:1.8.0_171]}}
 {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
~[na:1.8.0_171]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
~[kafka-streams-1.1.0.jar:na]}}

Snipped of code that caused this:

{{    GlobalKTable jsonRoutesToServices}}
 {{    = builder.globalTable("routes-to-services",}}
 {{    Consumed.with(Serdes.String(), 
jsonServiceListSerde),}}
 {{    Materialized.>as("routes-to-services"));}}

{{    TopologyDescription td = builder.build().describe();}}
 {{    String parent = null;}}
 {{    // We get an iterator to a TreeSet sorted by processing order, and 
just want the last one.}}
 {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
 {{    parent = store.processor().name();}}
     }
 {{    TopologyDescription tdtd = builder.build().describe();}}
 {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
UnneededCruftSupplier(), parent);}}
 {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
"fst-routes-to-services", Serdes.String().serializer(), 
fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
 {{    TopologyDescription tdtdtd = builder.build().describe();}}

Note that the exception is thrown on the last line of the code snippet, calling 
describe again before adding anything works fine.

 

  was:
After adding a a processor and a sink to topology after a globalstore and then 
calling StreamBuilder.build().describe() again (for debugging purposes and to 
check I got the topology right), had the following exception and stacktrace:

{{Caused by: java.util.NoSuchElementException: null}}
 {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
~[na:1.8.0_171]}}
 {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
~[na:1.8.0_171]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
~[kafka-streams-1.1.0.jar:na]}}

Snipped of code that caused this:

{{    GlobalKTable jsonRoutesToServices}}
{{    = builder.globalTable("routes-to-services",}}
{{    Consumed.with(Serdes.String(), 
jsonServiceListSerde),}}
{{    Materialized.>as("routes-to-services"));    
TopologyDescription td = builder.build().describe();}}
 {{    String parent = null;}}
 {{    // We get an iterator to a TreeSet sorted by processing order, and 
just want the last one.}}
 {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
 {{    parent = store.processor().name();}}
     }
 {{    TopologyDescription tdtd = builder.build().describe();}}
 {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
UnneededCruftSupplier(), parent);}}
 {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
"fst-routes-to-services", Serdes.String().serializer(), 
fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
 {{    TopologyDescription tdtdtd = builder.build().describe();}}

Note that the exception is thrown on the last line of the code snippet, calling 
describe again before adding anything works fine.

 


> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, 

[jira] [Updated] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-02 Thread Jouni (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jouni updated KAFKA-7125:
-
Description: 
After adding a a processor and a sink to topology after a globalstore and then 
calling StreamBuilder.build().describe() again (for debugging purposes and to 
check I got the topology right), had the following exception and stacktrace:

{{Caused by: java.util.NoSuchElementException: null}}
 {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
~[na:1.8.0_171]}}
 {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
~[na:1.8.0_171]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
~[kafka-streams-1.1.0.jar:na]}}

Snipped of code that caused this:

{{    GlobalKTable jsonRoutesToServices}}
{{    = builder.globalTable("routes-to-services",}}
{{    Consumed.with(Serdes.String(), 
jsonServiceListSerde),}}
{{    Materialized.>as("routes-to-services"));    
TopologyDescription td = builder.build().describe();}}
 {{    String parent = null;}}
 {{    // We get an iterator to a TreeSet sorted by processing order, and 
just want the last one.}}
 {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
 {{    parent = store.processor().name();}}
     }
 {{    TopologyDescription tdtd = builder.build().describe();}}
 {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
UnneededCruftSupplier(), parent);}}
 {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
"fst-routes-to-services", Serdes.String().serializer(), 
fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
 {{    TopologyDescription tdtdtd = builder.build().describe();}}

Note that the exception is thrown on the last line of the code snippet, calling 
describe again before adding anything works fine.

 

  was:
After adding a a processor and a sink to topology after a globalstore and then 
calling StreamBuilder.build().describe() again (for debugging purposes and to 
check I got the topology right), had the following exception and stacktrace:

{{Caused by: java.util.NoSuchElementException: null}}
 {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
~[na:1.8.0_171]}}
 {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
~[na:1.8.0_171]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
 ~[kafka-streams-1.1.0.jar:na]}}
 {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
~[kafka-streams-1.1.0.jar:na]}}

Snipped of code that caused this:

{{    TopologyDescription td = builder.build().describe();}}
 {{    String parent = null;}}
 {{    // We get an iterator to a TreeSet sorted by processing order, and 
just want the last one.}}
 {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
 {{    parent = store.processor().name();}}
    }
 {{    TopologyDescription tdtd = builder.build().describe();}}
 {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
UnneededCruftSupplier(), parent);}}
 {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
"fst-routes-to-services", Serdes.String().serializer(), 
fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
 {{    TopologyDescription tdtdtd = builder.build().describe();}}

Note that the exception is thrown on the last line of the code snippet, calling 
describe again before adding anything works fine.

 


> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused 

[jira] [Commented] (KAFKA-7118) Make KafkaConsumer compatible with multiple threads

2018-07-02 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529467#comment-16529467
 ] 

Richard Yu commented on KAFKA-7118:
---

I am not too sure if this is applicable. Issue might not be necessary.

> Make KafkaConsumer compatible with multiple threads
> ---
>
> Key: KAFKA-7118
> URL: https://issues.apache.org/jira/browse/KAFKA-7118
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: client, java, kip, needs-kip
>
> It was discovered that there is a performance constraint relating to 
> {{KafkaConsumer#close()}}. When several cores are modifying the same 
> consumer, a {{ConcurrentModification}} exception could result. This issue was 
> first posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. 
> It would be preferable if multiple threads are allowed to modify one 
> {{KafkaConsumer}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer

2018-07-02 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7127:
--
Description: Currently, in KafkaConsumer, various methods blocks due to a 
remote callback. It would be nice if we also added asynchronous version of 
these methods, like what was done with  {{commitAsync()}} which uses a 
{{OffsetCommitCallback}} as an input argument.  (was: Currently, in 
KafkaConsumer, various methods blocks due to a remote callback. It would be 
nice if we also added asynchronous version of these methods, like what was done 
with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a 
{{OffsetCommitCallback}} as an input argument.)

> Add asynchronous support for methods in KafkaConsumer
> -
>
> Key: KAFKA-7127
> URL: https://issues.apache.org/jira/browse/KAFKA-7127
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, in KafkaConsumer, various methods blocks due to a remote callback. 
> It would be nice if we also added asynchronous version of these methods, like 
> what was done with  {{commitAsync()}} which uses a {{OffsetCommitCallback}} 
> as an input argument.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer

2018-07-02 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529463#comment-16529463
 ] 

Richard Yu commented on KAFKA-7127:
---

cc [~hachikuji]

> Add asynchronous support for methods in KafkaConsumer
> -
>
> Key: KAFKA-7127
> URL: https://issues.apache.org/jira/browse/KAFKA-7127
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, in KafkaConsumer, various methods blocks due to a remote callback. 
> It would be nice if we also added asynchronous version of these methods, like 
> what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a 
> {{OffsetCommitCallback}} as an input argument.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer

2018-07-02 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-7127:
-

 Summary: Add asynchronous support for methods in KafkaConsumer
 Key: KAFKA-7127
 URL: https://issues.apache.org/jira/browse/KAFKA-7127
 Project: Kafka
  Issue Type: Wish
  Components: clients
Reporter: Richard Yu


Currently, in KafkaConsumer, various methods blocks due to a remote callback. 
It would be nice if we also added asynchronous version of these methods, like 
what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a 
{{OffsetCommitCallback}} as an input argument.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)