[jira] [Updated] (KAFKA-7118) Make KafkaConsumer compatible with multiple threads
[ https://issues.apache.org/jira/browse/KAFKA-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7118: -- Description: It was discovered that there is a performance constraint relating to {{KafkaConsumer}}. When several cores are modifying the same consumer, a {{ConcurrentModification}} exception could result. This issue was first posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. It would be preferable if multiple threads are allowed to modify one {{KafkaConsumer}}. (was: It was discovered that there is a performance constraint relating to {{KafkaConsumer#close()}}. When several cores are modifying the same consumer, a {{ConcurrentModification}} exception could result. This issue was first posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. It would be preferable if multiple threads are allowed to modify one {{KafkaConsumer}}.) > Make KafkaConsumer compatible with multiple threads > --- > > Key: KAFKA-7118 > URL: https://issues.apache.org/jira/browse/KAFKA-7118 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Major > Labels: client, java, kip, needs-kip > > It was discovered that there is a performance constraint relating to > {{KafkaConsumer}}. When several cores are modifying the same consumer, a > {{ConcurrentModification}} exception could result. This issue was first > posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. It > would be preferable if multiple threads are allowed to modify one > {{KafkaConsumer}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread
[ https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251 ] Ted Yu edited comment on KAFKA-6303 at 7/3/18 5:38 AM: --- +1 from me was (Author: yuzhih...@gmail.com): +1 > Potential lack of synchronization in NioEchoServer#AcceptorThread > - > > Key: KAFKA-6303 > URL: https://issues.apache.org/jira/browse/KAFKA-6303 > Project: Kafka > Issue Type: Bug > Components: network >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In the run() method: > {code} > SocketChannel socketChannel = > ((ServerSocketChannel) key.channel()).accept(); > socketChannel.configureBlocking(false); > newChannels.add(socketChannel); > {code} > Modification to newChannels should be protected by synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times
[ https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530850#comment-16530850 ] Richard Yu commented on KAFKA-6687: --- Oh, in that case, I could see what you mean. Thanks for the clarifications. :) I couldn't really tell when DSL was being referred to and when runtime was involved. > Allow to read a topic multiple times > > > Key: KAFKA-6687 > URL: https://issues.apache.org/jira/browse/KAFKA-6687 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users often want to read topic multiple times. However, this is not possible > because there is a single consumer and thus a topic can only be consumed once. > Users get an exception > {quote}Exception in thread “main” > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > source has already been registered by another source. > {quote} > If they use a topic name in multiple `stream()`, `table()`, `globalTable()` > calls. > However, with KAFKA-6034 in place, we could allow adding a topic multiple > times and rewrite the topology internally to only read the topic once. This > would simplify application code as users don't need to put workaround in > place to get the same behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`
[ https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530827#comment-16530827 ] ASF GitHub Bot commented on KAFKA-7101: --- guozhangwang closed pull request #5298: KAFKA-7101: Consider session store for windowed store default configs URL: https://github.com/apache/kafka/pull/5298 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index ed51754d978..c644f9bbf38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,8 +139,10 @@ public StateStore build() { } long retentionPeriod() { -if (isWindowStore()) { +if (builder instanceof WindowStoreBuilder) { return ((WindowStoreBuilder) builder).retentionPeriod(); +} else if (builder instanceof SessionStoreBuilder) { +return ((SessionStoreBuilder) builder).retentionPeriod(); } else { throw new IllegalStateException("retentionPeriod is not supported when not a window store"); } @@ -159,7 +162,7 @@ private String name() { } private boolean isWindowStore() { -return builder instanceof WindowStoreBuilder; +return builder instanceof WindowStoreBuilder || builder instanceof SessionStoreBuilder; } // Apparently Java strips the generics from this method because we're using the raw type for builder, @@ -226,7 +229,7 @@ private SourceNodeFactory(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer) { super(name, NO_PREDECESSORS); -this.topics = topics != null ? Arrays.asList(topics) : new ArrayList(); +this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>(); this.pattern = pattern; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; @@ -316,7 +319,7 @@ public ProcessorNode build() { final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName; if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id -return new SinkNode<>(name, new StaticTopicNameExtractor(decorateTopic(topic)), keySerializer, valSerializer, partitioner); +return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner); } else { return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner); } @@ -415,7 +418,7 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, throw new TopologyException("Sink " + name + " must have at least one parent"); } -addSink(name, new StaticTopicNameExtractor(topic), keySerializer, valSerializer, partitioner, predecessorNames); +addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames); nodeToSinkTopic.put(name, topic); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java index 04b0ceb1ecf..69540899ada 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java @@ -35,4 +35,11 @@ * @return segmentInterval in milliseconds */ long segmentIntervalMs(); + +/** + * The time period for which the {@link SessionStore} will retain historic data. + * + * @return retentionPeriod + */ +long retentionPeriod(); } diff --git
[jira] [Resolved] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`
[ https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7101. -- Resolution: Fixed Fix Version/s: 2.1.0 > Session Window store should set topic policy `compact,cleanup` > -- > > Key: KAFKA-7101 > URL: https://issues.apache.org/jira/browse/KAFKA-7101 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.1.0 > > > With > [KIP-71|https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist] > (0.10.1.0) topic config `compact,delete` was introduce to apply to windowed > store changelog topics in Kafka Streams. Later (0.10.2.0), session windows > got added in > [KIP-94|https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows]. > However, session windows do not use `compact,delete` at the moment. This > result is the same issue window stores face before KIP-71. Thus, we should > enable `compact,delete` for session window changelog topics, too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times
[ https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530823#comment-16530823 ] Matthias J. Sax commented on KAFKA-6687: Btw: there is one special case: reading a topic twice at runtime level might make sense, if the topic is read as a KTable and GlobalKTable at the same time – because for this case, we use two different consumers. Atm, this scenario is not supported either. > Allow to read a topic multiple times > > > Key: KAFKA-6687 > URL: https://issues.apache.org/jira/browse/KAFKA-6687 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users often want to read topic multiple times. However, this is not possible > because there is a single consumer and thus a topic can only be consumed once. > Users get an exception > {quote}Exception in thread “main” > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > source has already been registered by another source. > {quote} > If they use a topic name in multiple `stream()`, `table()`, `globalTable()` > calls. > However, with KAFKA-6034 in place, we could allow adding a topic multiple > times and rewrite the topology internally to only read the topic once. This > would simplify application code as users don't need to put workaround in > place to get the same behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times
[ https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530822#comment-16530822 ] Matthias J. Sax commented on KAFKA-6687: You have to distinguish between the DSL level and the runtime level. At runtime level, it is not possible (and also does not make sense) to read a topic twice. Atm, this restriction is reflected in the DSL. The idea is, to remove this restriction from the DSL and rewrite the topology internally. Ie, it's about syntactic sugar at DSL level. Atm, users might need to write "ugly" code to get the same result. Note, that this ticket is blocked, until we get the first version of the new topology optimizer in place. This is WIP by [~bbejeck]. Does this make sense? > Allow to read a topic multiple times > > > Key: KAFKA-6687 > URL: https://issues.apache.org/jira/browse/KAFKA-6687 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users often want to read topic multiple times. However, this is not possible > because there is a single consumer and thus a topic can only be consumed once. > Users get an exception > {quote}Exception in thread “main” > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > source has already been registered by another source. > {quote} > If they use a topic name in multiple `stream()`, `table()`, `globalTable()` > calls. > However, with KAFKA-6034 in place, we could allow adding a topic multiple > times and rewrite the topology internally to only read the topic once. This > would simplify application code as users don't need to put workaround in > place to get the same behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530724#comment-16530724 ] Ted Yu commented on KAFKA-5037: --- Originally I threw exception in the if block for the following line in {{assign}}: {code} if (numPartitionsCandidate == null) { {code} In patch v2, I construct assignment map. See if this is close to what you propose. > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++, user-experience > Attachments: 5037.v2.txt > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5037: -- Attachment: (was: 5037.v1.txt) > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++, user-experience > Attachments: 5037.v2.txt > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5037: -- Attachment: 5037.v2.txt > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++, user-experience > Attachments: 5037.v2.txt > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6687) Allow to read a topic multiple times
[ https://issues.apache.org/jira/browse/KAFKA-6687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530668#comment-16530668 ] Richard Yu commented on KAFKA-6687: --- It appears that the summary and the description is somewhat contradictory: while the summary states that we should "allow a topic to be read multiple times", in the description it states that "we should allow adding a topic multiple times and rewrite the topology internally so that it can be read *only* once." Which one is it? > Allow to read a topic multiple times > > > Key: KAFKA-6687 > URL: https://issues.apache.org/jira/browse/KAFKA-6687 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users often want to read topic multiple times. However, this is not possible > because there is a single consumer and thus a topic can only be consumed once. > Users get an exception > {quote}Exception in thread “main” > org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic > source has already been registered by another source. > {quote} > If they use a topic name in multiple `stream()`, `table()`, `globalTable()` > calls. > However, with KAFKA-6034 in place, we could allow adding a topic multiple > times and rewrite the topology internally to only read the topic once. This > would simplify application code as users don't need to put workaround in > place to get the same behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5037: -- Attachment: 5037.v1.txt > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++, user-experience > Attachments: 5037.v1.txt > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-4999) Add convenience overload for seek* methods
[ https://issues.apache.org/jira/browse/KAFKA-4999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-4999: - Assignee: (was: Richard Yu) > Add convenience overload for seek* methods > -- > > Key: KAFKA-4999 > URL: https://issues.apache.org/jira/browse/KAFKA-4999 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Timo Meijer >Priority: Major > Labels: Quickfix, needs-kip > > The most common use case when using the seek* methods is to work on the > currently assigned partitions. This behavior is supported by passing an empty > list, but this is not very intuitive. > Adding an overloaded method for all seek* methods without parameters that has > the same behavior; using the currently assigned partitions, would improve the > API and user experience. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL
[ https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-6049: - Assignee: (was: Richard Yu) > Kafka Streams: Add Cogroup in the DSL > - > > Key: KAFKA-6049 > URL: https://issues.apache.org/jira/browse/KAFKA-6049 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: api, kip, user-experience > > When multiple streams aggregate together to form a single larger object (e.g. > a shopping website may have a cart stream, a wish list stream, and a > purchases stream. Together they make up a Customer), it is very difficult to > accommodate this in the Kafka-Streams DSL: it generally requires you to group > and aggregate all of the streams to KTables then make multiple outer join > calls to end up with a KTable with your desired object. This will create a > state store for each stream and a long chain of ValueJoiners that each new > record must go through to get to the final object. > Creating a cogroup method where you use a single state store will: > * Reduce the number of gets from state stores. With the multiple joins when a > new value comes into any of the streams a chain reaction happens where the > join processor keep calling ValueGetters until we have accessed all state > stores. > * Slight performance increase. As described above all ValueGetters are called > also causing all ValueJoiners to be called forcing a recalculation of the > current joined value of all other streams, impacting performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion
[ https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-7128: -- Assignee: Jason Gustafson > Lagging high watermark can lead to committed data loss after ISR expansion > -- > > Key: KAFKA-7128 > URL: https://issues.apache.org/jira/browse/KAFKA-7128 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Some model checking exposed a weakness in the ISR expansion logic. We know > that the high watermark can go backwards after a leader failover, but we may > not have known that this can lead to the loss of committed data. > Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of > (r1, r2) and the leader is r1. r3 is a new replica which has not begun > fetching. The data up to offset 10 has been committed to the ISR. Here is the > initial state: > ISR: (r1, r2) > Leader: r1 > r1: [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=0] > Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes > r2 the new leader. The high watermark is still lagging r1. > ISR: (r2) > Leader: r2 > r1 (offline): [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=0] > Replica 3 then catch up to the high watermark on r2 and joins the ISR. > Perhaps it's high watermark is lagging behind r2, but this is unimportant. > ISR: (r2, r3) > Leader: r2 > r1 (offline): [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=5] > Now r2 fails and r3 is elected leader and is the only member of the ISR. The > committed data from offsets 5 to 10 has been lost. > ISR: (r3) > Leader: r3 > r1 (offline): [hw=10, leo=10] > r2 (offline): [hw=5, leo=10] > r3: [hw=0, leo=5] > The bug is the fact that we allowed r3 into the ISR after the local high > watermark had been reached. Since the follower does not know the true high > watermark for the previous leader's epoch, it should not allow a replica to > join the ISR until it has caught up to an offset within its own epoch. > Note this is related to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion
[ https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-7128: - Assignee: (was: Richard Yu) > Lagging high watermark can lead to committed data loss after ISR expansion > -- > > Key: KAFKA-7128 > URL: https://issues.apache.org/jira/browse/KAFKA-7128 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > Some model checking exposed a weakness in the ISR expansion logic. We know > that the high watermark can go backwards after a leader failover, but we may > not have known that this can lead to the loss of committed data. > Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of > (r1, r2) and the leader is r1. r3 is a new replica which has not begun > fetching. The data up to offset 10 has been committed to the ISR. Here is the > initial state: > ISR: (r1, r2) > Leader: r1 > r1: [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=0] > Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes > r2 the new leader. The high watermark is still lagging r1. > ISR: (r2) > Leader: r2 > r1 (offline): [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=0] > Replica 3 then catch up to the high watermark on r2 and joins the ISR. > Perhaps it's high watermark is lagging behind r2, but this is unimportant. > ISR: (r2, r3) > Leader: r2 > r1 (offline): [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=5] > Now r2 fails and r3 is elected leader and is the only member of the ISR. The > committed data from offsets 5 to 10 has been lost. > ISR: (r3) > Leader: r3 > r1 (offline): [hw=10, leo=10] > r2 (offline): [hw=5, leo=10] > r3: [hw=0, leo=5] > The bug is the fact that we allowed r3 into the ISR after the local high > watermark had been reached. Since the follower does not know the true high > watermark for the previous leader's epoch, it should not allow a replica to > join the ISR until it has caught up to an offset within its own epoch. > Note this is related to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion
[ https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-7128: - Assignee: Richard Yu (was: Jason Gustafson) > Lagging high watermark can lead to committed data loss after ISR expansion > -- > > Key: KAFKA-7128 > URL: https://issues.apache.org/jira/browse/KAFKA-7128 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Richard Yu >Priority: Major > > Some model checking exposed a weakness in the ISR expansion logic. We know > that the high watermark can go backwards after a leader failover, but we may > not have known that this can lead to the loss of committed data. > Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of > (r1, r2) and the leader is r1. r3 is a new replica which has not begun > fetching. The data up to offset 10 has been committed to the ISR. Here is the > initial state: > ISR: (r1, r2) > Leader: r1 > r1: [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=0] > Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes > r2 the new leader. The high watermark is still lagging r1. > ISR: (r2) > Leader: r2 > r1 (offline): [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=0] > Replica 3 then catch up to the high watermark on r2 and joins the ISR. > Perhaps it's high watermark is lagging behind r2, but this is unimportant. > ISR: (r2, r3) > Leader: r2 > r1 (offline): [hw=10, leo=10] > r2: [hw=5, leo=10] > r3: [hw=0, leo=5] > Now r2 fails and r3 is elected leader and is the only member of the ISR. The > committed data from offsets 5 to 10 has been lost. > ISR: (r3) > Leader: r3 > r1 (offline): [hw=10, leo=10] > r2 (offline): [hw=5, leo=10] > r3: [hw=0, leo=5] > The bug is the fact that we allowed r3 into the ISR after the local high > watermark had been reached. Since the follower does not know the true high > watermark for the previous leader's epoch, it should not allow a replica to > join the ISR until it has caught up to an offset within its own epoch. > Note this is related to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530617#comment-16530617 ] Guozhang Wang commented on KAFKA-5037: -- I'm adding the tag `newbie++` for anyone wanting to pick it up. Note the proposed solution is targeted to fix all three JIRAs: 5037, 6587, 6437. > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++, user-experience > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5037) Infinite loop if all input topics are unknown at startup
[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-5037: - Labels: newbie++ user-experience (was: user-experience) > Infinite loop if all input topics are unknown at startup > > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: newbie++, user-experience > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore
[ https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530555#comment-16530555 ] ASF GitHub Bot commented on KAFKA-7080: --- guozhangwang closed pull request #5257: KAFKA-7080: replace numSegments with segmentInterval URL: https://github.com/apache/kafka/pull/5257 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index aa3dec17239..fc1fb9f5d13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -66,11 +66,11 @@ public final class SessionWindows { private final long gapMs; -private long maintainDurationMs; +private final long maintainDurationMs; -private SessionWindows(final long gapMs) { +private SessionWindows(final long gapMs, final long maintainDurationMs) { this.gapMs = gapMs; -maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS; +this.maintainDurationMs = maintainDurationMs; } /** @@ -85,7 +85,8 @@ public static SessionWindows with(final long inactivityGapMs) { if (inactivityGapMs <= 0) { throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative."); } -return new SessionWindows(inactivityGapMs); +final long oneDayMs = 24 * 60 * 60_000L; +return new SessionWindows(inactivityGapMs, oneDayMs); } /** @@ -99,9 +100,8 @@ public SessionWindows until(final long durationMs) throws IllegalArgumentExcepti if (durationMs < gapMs) { throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap."); } -maintainDurationMs = durationMs; -return this; +return new SessionWindows(gapMs, durationMs); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 09fdfce948d..53ead1e9b73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -36,18 +36,10 @@ */ public abstract class Windows { -private static final int DEFAULT_NUM_SEGMENTS = 3; +private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day +@Deprecated public int segments = 3; -static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day - -private long maintainDurationMs; - -public int segments; - -protected Windows() { -segments = DEFAULT_NUM_SEGMENTS; -maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS; -} +protected Windows() {} /** * Set the window maintain duration (retention time) in milliseconds. @@ -76,6 +68,19 @@ public long maintainMs() { return maintainDurationMs; } +/** + * Return the segment interval in milliseconds. + * + * @return the segment interval + */ +@SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later. +public long segmentInterval() { +// Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient. +final long minimumSegmentInterval = 60_000L; +// Scaled to the (possibly overridden) retention period +return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval); +} + /** * Set the number of segments to be used for rolling the window store. * This function is not exposed to users but can be called by developers that extend this class. @@ -83,7 +88,9 @@ public long maintainMs() { * @param segments the number of segments to be used * @return itself * @throws IllegalArgumentException if specified segments is small than 2 + * @deprecated since 2.1 Override segmentInterval() instead. */ +@Deprecated protected Windows segments(final int segments) throws IllegalArgumentException { if (segments < 2) { throw new IllegalArgumentException("Number of segments must be at least 2."); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index bc56a3d0350..acfdf35b96a 100644 ---
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530530#comment-16530530 ] Guozhang Wang commented on KAFKA-7125: -- [~Ahto] I have a PR ready to lift the assumption linked above, you are free to try it out. BUT keep in mind that adding a sink node into the sub-topology for global store update would not usually work, since every instance of your streams application would have a independent global store update task, which means that multiple of those tasks may be writing to the same topic. Assume you have N instances, then each of the N instances will read from the global source topic, do the "UnneededCruftSupplier" processing, and then writing to the sink topic "fst-routes-to-services", which means that "fst-routes-to-services" will have N duplication factor. So it brings to me another potential bug that we should fix: for global store's sub topology, we should disallow 1) adding a local state store to it, 2) adding a sink node to it, and 3) claiming any of its processor nodes as parent of the processor nodes of the other normal processing sub-topology. We should consider doing this check as well. [~NIzhikov] If you are interested to fix this issue, please feel free to take a look at my PR and then start working on this one I mentioned. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530469#comment-16530469 ] Yishun Guan commented on KAFKA-6788: Does it have a retry mechanism now? I am looking at KAKFA-6789 (https://issues.apache.org/jira/browse/KAFKA-6789) where it talks about a possible implementation of a retry mechanism. > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530449#comment-16530449 ] Colin P. McCabe commented on KAFKA-6788: This is an optimization which we'd like to do at some point, and which hasn't been done yet. Basically, the optimization is that if you have a bunch of groups in the batch request which all share a common group coordinator, we'd like to send one RPC to that group coordinator rather than several. I think this would be a hard optimization to do correctly because of the error handling issues. If you get an error for some, but not all, elements of the batch, you want to retry just those elements. The current closed PR looks like it got some wires crossed. It seems to be replacing DescribeGroups with ListGroups, which isn't what we want here. Listing all the groups is not very efficient. > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530386#comment-16530386 ] Jouni commented on KAFKA-7125: -- Thanks Guozhang. I will try it when the PR is ready. Although I must first set up my development environment so that I can actually compile Kafka... and make sure that it works also with Spring framework. It's late night here where I live, and there's need for some sleep after several hours of coding and having a "few" beers later, so don't expect a quick answer. I have also found a workaround for my case. It involves a few custom partitioners on the producer side so correlated data will end up on the right place. I used a globaltable just because my joins need a keyvaluemapper, the lookup key must be constructed from data in another incoming stream. Instead of using a globaltable, I can then just subscribe to a stream, do my not-streams-related work doing a dummy aggregation resulting to a table, and for the joins, just write my own implementations of transformersupplier/transformer and get the data from a state store. Oh I wish there would be stream-to-table joins with keyvaluemapper! Haven't yet written the code for my workaround, so I'll test your PR first, when it's ready. Probably will end up implementing my workaround later anyway, should be faster, and performance will become very important in the future. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion
Jason Gustafson created KAFKA-7128: -- Summary: Lagging high watermark can lead to committed data loss after ISR expansion Key: KAFKA-7128 URL: https://issues.apache.org/jira/browse/KAFKA-7128 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Some model checking exposed a weakness in the ISR expansion logic. We know that the high watermark can go backwards after a leader failover, but we may not have known that this can lead to the loss of committed data. Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of (r1, r2) and the leader is r1. r3 is a new replica which has not begun fetching. The data up to offset 10 has been committed to the ISR. Here is the initial state: ISR: (r1, r2) Leader: r1 r1: [hw=10, leo=10] r2: [hw=5, leo=10] r3: [hw=0, leo=0] Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes r2 the new leader. The high watermark is still lagging r1. ISR: (r2) Leader: r2 r1 (offline): [hw=10, leo=10] r2: [hw=5, leo=10] r3: [hw=0, leo=0] Replica 3 then catch up to the high watermark on r2 and joins the ISR. Perhaps it's high watermark is lagging behind r2, but this is unimportant. ISR: (r2, r3) Leader: r2 r1 (offline): [hw=10, leo=10] r2: [hw=5, leo=10] r3: [hw=0, leo=5] Now r2 fails and r3 is elected leader and is the only member of the ISR. The committed data from offsets 5 to 10 has been lost. ISR: (r3) Leader: r3 r1 (offline): [hw=10, leo=10] r2 (offline): [hw=5, leo=10] r3: [hw=0, leo=5] The bug is the fact that we allowed r3 into the ISR after the local high watermark had been reached. Since the follower does not know the true high watermark for the previous leader's epoch, it should not allow a replica to join the ISR until it has caught up to an offset within its own epoch. Note this is related to https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530220#comment-16530220 ] Guozhang Wang commented on KAFKA-6788: -- [~cmccabe] Would like to have you chime into this one, what's the current status of this request and how would we like to fix it? > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530209#comment-16530209 ] Guozhang Wang commented on KAFKA-7125: -- [~Ahto] Thanks for your updated explanation. And yes your findings is correct: today we assume that for global store, there will only be two processor node correlated to it, as the source and update processor. This is an unintended limitation we added into the topology building process, and I will provide a PR trying to lift this limitation. Please feel free to try it out and let me know if it resolves your problem. And if yes I will try to merge it into trunk so that the next minor release will have this fix. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530202#comment-16530202 ] Matthias J. Sax commented on KAFKA-6583: If we close as duplicate, we can resolve immediately. But you are right, it should not say "fixed" but "duplicate". Corrected this. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6583. Resolution: Duplicate > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-6583: > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530087#comment-16530087 ] Ted Yu commented on KAFKA-6583: --- Shouldn't this be marked Resolved when KAFKA-4696 (Open as of now) is integrated ? > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7108) "Exactly-once" stream breaks production exception handler contract
[ https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530086#comment-16530086 ] Matthias J. Sax commented on KAFKA-7108: I just double checked the code. We actually allow to override retries (and thus we do not log anything). IIRC, for exactly-once, setting retries to MAX_VALUE is recommended but not enforced by the producer and thus, Kafka Streams also does not enforce it. The question is, why to you set it to 240 and not use the default of MAX_VALUE? Also note, you can set different configs for Streams itself and for the internal producer/consumer/admin clients if you want (cf. https://kafka.apache.org/11/documentation/streams/developer-guide/config-streams.html#id17) > "Exactly-once" stream breaks production exception handler contract > -- > > Key: KAFKA-7108 > URL: https://issues.apache.org/jira/browse/KAFKA-7108 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Anna O >Priority: Major > Labels: exactly-once > > I have a stream configured with "default.production.exception.handler" that > is supposed to log the error and continue. When I set "processing.guarantee" > to "exactly_once" it appeared that retryable NotEnoughReplicasException that > passed the production exception handler was rethrown by the > TransactionManager wrapped with KafkaException and terminated the stream > thread: > _org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error stateat > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > [kafka-streams-1.1.0.jar:?]_ > _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: > Messages are rejected since there are fewer in-sync replicas than required._ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6583. Resolution: Fixed > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530064#comment-16530064 ] Cyrus Vafadari commented on KAFKA-6788: --- [~guozhang] I abandoned it a while ago when it didn't get attention, so haven't actively worked on it in a while. I think if the code is moving towards using futures/promise it might be better to start from scratch. If you like this strategy though I'll fix merge conflicts and reopen the PR > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Yishun Guan >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs
[ https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6859: --- Fix Version/s: 2.1.0 > Follower should not send OffsetForLeaderEpoch for undefined leader epochs > - > > Key: KAFKA-6859 > URL: https://issues.apache.org/jira/browse/KAFKA-6859 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0 > > > This is more of an optimization, rather than correctness. > Currently, if the follower on inter broker protocol version 0.11 and higher, > but on older message format, it does not track leader epochs. However, will > still send OffsetForLeaderEpoch request to the leader with undefined epoch > which is guaranteed to return undefined offset, so that the follower > truncated to high watermark. Another example is a bootstrapping follower that > does not have any leader epochs recorded, > It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to > the follower with undefined leader epochs, since we already know the answer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs
[ https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-6859: -- Assignee: Stanislav Kozlovski > Follower should not send OffsetForLeaderEpoch for undefined leader epochs > - > > Key: KAFKA-6859 > URL: https://issues.apache.org/jira/browse/KAFKA-6859 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.1.0 > > > This is more of an optimization, rather than correctness. > Currently, if the follower on inter broker protocol version 0.11 and higher, > but on older message format, it does not track leader epochs. However, will > still send OffsetForLeaderEpoch request to the leader with undefined epoch > which is guaranteed to return undefined offset, so that the follower > truncated to high watermark. Another example is a bootstrapping follower that > does not have any leader epochs recorded, > It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to > the follower with undefined leader epochs, since we already know the answer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs
[ https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529868#comment-16529868 ] ASF GitHub Bot commented on KAFKA-6859: --- stanislavkozlovski opened a new pull request #5320: KAFKA-6859: Do not send LeaderEpochRequest for undefined leader epochs URL: https://github.com/apache/kafka/pull/5320 If a broker or topic has a message format < 0.11, he does not track leader epochs. LeaderEpochRequests for such will always return undefined, making the follower truncate to the highest watermark. Since there is no use to use the network for such cases, don't send a request. ### Notes * I am not sure whether we should even check `brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2`, since log message format is the only thing we care about and as far as I understand it is always defined * I am not fond of the subclassing in the tests for mocking the method (and also making it protected) but this was the cleanest solution I could come up with. Other ideas are welcome ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Follower should not send OffsetForLeaderEpoch for undefined leader epochs > - > > Key: KAFKA-6859 > URL: https://issues.apache.org/jira/browse/KAFKA-6859 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Anna Povzner >Priority: Major > > This is more of an optimization, rather than correctness. > Currently, if the follower on inter broker protocol version 0.11 and higher, > but on older message format, it does not track leader epochs. However, will > still send OffsetForLeaderEpoch request to the leader with undefined epoch > which is guaranteed to return undefined offset, so that the follower > truncated to high watermark. Another example is a bootstrapping follower that > does not have any leader epochs recorded, > It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to > the follower with undefined leader epochs, since we already know the answer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6268) Tools should not swallow exceptions like resolving network names
[ https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529818#comment-16529818 ] Rajini Sivaram commented on KAFKA-6268: --- [~astubbs] Diagnostics for security failures was improved under KIP-152 (KAFKA-4764 and KAFKA-5920) in 1.0.0 and we should have log entries for those at error level. [~enether] Can you verify that we have log entries at warn or above if we can't parse socket response? Thanks. > Tools should not swallow exceptions like resolving network names > > > Key: KAFKA-6268 > URL: https://issues.apache.org/jira/browse/KAFKA-6268 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Antony Stubbs >Assignee: Stanislav Kozlovski >Priority: Major > > The cli consumer client shows nothing when it can't resolve a domain. This > and other errors like it should be shown to the user by default. You have to > turn on DEBUG level logging in the tools log4j to find there is an error. > {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node > as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) > (org.apache.kafka.clients.NetworkClient) > java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092 > at org.apache.kafka.common.network.Selector.connect(Selector.java:195) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764) > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:101) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) > at org.apache.kafka.common.network.Selector.connect(Selector.java:192) > ... 18 more > }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6268) Tools should not swallow exceptions like resolving network names
[ https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529801#comment-16529801 ] Antony Stubbs commented on KAFKA-6268: -- [~rsivaram] does this fix also cover security related errors like bad keystone passwords, can't obtain kerberos ticket, can't parse socket response (connected to wrong socket) type errors as well? > Tools should not swallow exceptions like resolving network names > > > Key: KAFKA-6268 > URL: https://issues.apache.org/jira/browse/KAFKA-6268 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Antony Stubbs >Assignee: Stanislav Kozlovski >Priority: Major > > The cli consumer client shows nothing when it can't resolve a domain. This > and other errors like it should be shown to the user by default. You have to > turn on DEBUG level logging in the tools log4j to find there is an error. > {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node > as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) > (org.apache.kafka.clients.NetworkClient) > java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092 > at org.apache.kafka.common.network.Selector.connect(Selector.java:195) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764) > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:101) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) > at org.apache.kafka.common.network.Selector.connect(Selector.java:192) > ... 18 more > }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7108) "Exactly-once" stream breaks production exception handler contract
[ https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529726#comment-16529726 ] Anna O commented on KAFKA-7108: --- [~mjsax] - here is the StreamsConfig: StreamsConfig values: a_pplication.id = ..._ _application.server =_ _bootstrap.servers = [...]_ _buffered.records.per.partition = 1000_ _cache.max.bytes.buffering = 10485760_ _client.id =_ _commit.interval.ms = 100_ _connections.max.idle.ms = 54_ _default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler_ _default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde_ _default.production.exception.handler = class com...LogAndContinueProductionExceptionHandler_ _default.timestamp.extractor = class com...UmsEventTimestampExtractor_ _default.value.serde = class com...JsonSerde_ _key.serde = null_ _metadata.max.age.ms = 30_ _metric.reporters = []_ _metrics.num.samples = 2_ _metrics.recording.level = INFO_ _metrics.sample.window.ms = 3_ _num.standby.replicas = 0_ _num.stream.threads = 8_ _partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper_ _poll.ms = 100_ _processing.guarantee = exactly_once_ _receive.buffer.bytes = 32768_ _reconnect.backoff.max.ms = 1000_ _reconnect.backoff.ms = 50_ _replication.factor = 3_ _request.timeout.ms = 4_ _retries = 240_ _retry.backoff.ms = 500_ _rocksdb.config.setter = null_ _security.protocol = PLAINTEXT_ _send.buffer.bytes = 131072_ _state.cleanup.delay.ms = 60_ _state.dir = /tmp/kafka-streams_ _timestamp.extractor = null_ _value.serde = null_ _windowstore.changelog.additional.retention.ms = 8640_ _zookeeper.connect =_ This is how we override the retries in the KafkaStreams config in the code: __ config.put(StreamsConfig.RETRIES_CONFIG, 240); To your remark "_If yes, you should see a WARN log that the overwrite is ignored..._" - there is no such log... > "Exactly-once" stream breaks production exception handler contract > -- > > Key: KAFKA-7108 > URL: https://issues.apache.org/jira/browse/KAFKA-7108 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Anna O >Priority: Major > Labels: exactly-once > > I have a stream configured with "default.production.exception.handler" that > is supposed to log the error and continue. When I set "processing.guarantee" > to "exactly_once" it appeared that retryable NotEnoughReplicasException that > passed the production exception handler was rethrown by the > TransactionManager wrapped with KafkaException and terminated the stream > thread: > _org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error stateat > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > [kafka-streams-1.1.0.jar:?]_ > _at >
[jira] [Commented] (KAFKA-7120) When Connect throws CONFLICT error for REST requests, it will help to see more details
[ https://issues.apache.org/jira/browse/KAFKA-7120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529611#comment-16529611 ] ASF GitHub Bot commented on KAFKA-7120: --- lambdaliu opened a new pull request #5317: KAFKA-7120: Add information to indicate which connector resource request cannot be completed URL: https://github.com/apache/kafka/pull/5317 Right now, we throw: throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request because of a conflicting operation (e.g. worker rebalance)"); There's no information about WHICH request can't be completed. It will help to know. This PR add the path, method and body of the request to the message field of ConnectRestException to indicate which request can't be completed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > When Connect throws CONFLICT error for REST requests, it will help to see > more details > -- > > Key: KAFKA-7120 > URL: https://issues.apache.org/jira/browse/KAFKA-7120 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Gwen Shapira >Assignee: lambdaliu >Priority: Critical > > Right now, we throw: > throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), > "Cannot complete request because of a conflicting operation (e.g. worker > rebalance)"); > There's no information about WHICH request can't be completed. It will help > to know. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529547#comment-16529547 ] Jouni commented on KAFKA-7125: -- Had a look at Kafka sources, and especially the comment written in InternalTopologyBuilder.describeGlobalStore: // we found a GlobalStore node group; those contain exactly two node: \{sourceNode,processorNode} My code breaks that assumption. Probable explanation: isGlobalSource returns true only on the last node, iterator has moved to the end, and the call to final String processorNode = nodes.iterator().next(); // get remaining processorNode fails miserably. Maybe I'll have to find another way to do what I need to do but am not supposed to... > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6268) Tools should not swallow exceptions like resolving network names
[ https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-6268: - Summary: Tools should not swallow exceptions like resolving network names (was: Tools should now swallow exceptions like resolving network names) > Tools should not swallow exceptions like resolving network names > > > Key: KAFKA-6268 > URL: https://issues.apache.org/jira/browse/KAFKA-6268 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Antony Stubbs >Assignee: Stanislav Kozlovski >Priority: Major > > The cli consumer client shows nothing when it can't resolve a domain. This > and other errors like it should be shown to the user by default. You have to > turn on DEBUG level logging in the tools log4j to find there is an error. > {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node > as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) > (org.apache.kafka.clients.NetworkClient) > java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092 > at org.apache.kafka.common.network.Selector.connect(Selector.java:195) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764) > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:101) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) > at org.apache.kafka.common.network.Selector.connect(Selector.java:192) > ... 18 more > }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529506#comment-16529506 ] Jouni commented on KAFKA-7125: -- Sorry, forgot that one relevant call just before getting TopologyDescription... bug report updated. And there's a good reason for attaching a downstream processor to the statestores processor. I have a need to catch the updates Change<> -event, do some totally not-streams-related work to it and forward the results again downstream. But my main point is that calling builder.describe() again shouldn't throw an exception. Something in handing the internal state of Topology clearly isn't totally right. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jouni updated KAFKA-7125: - Description: After adding a a processor and a sink to topology after a globalstore and then calling StreamBuilder.build().describe() again (for debugging purposes and to check I got the topology right), had the following exception and stacktrace: {{Caused by: java.util.NoSuchElementException: null}} {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) ~[na:1.8.0_171]}} {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) ~[na:1.8.0_171]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) ~[kafka-streams-1.1.0.jar:na]}} Snipped of code that caused this: {{ GlobalKTable jsonRoutesToServices}} {{ = builder.globalTable("routes-to-services",}} {{ Consumed.with(Serdes.String(), jsonServiceListSerde),}} {{ Materialized.>as("routes-to-services"));}} {{ TopologyDescription td = builder.build().describe();}} {{ String parent = null;}} {{ // We get an iterator to a TreeSet sorted by processing order, and just want the last one.}} {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} {{ parent = store.processor().name();}} } {{ TopologyDescription tdtd = builder.build().describe();}} {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new UnneededCruftSupplier(), parent);}} {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", "fst-routes-to-services", Serdes.String().serializer(), fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} {{ TopologyDescription tdtdtd = builder.build().describe();}} Note that the exception is thrown on the last line of the code snippet, calling describe again before adding anything works fine. was: After adding a a processor and a sink to topology after a globalstore and then calling StreamBuilder.build().describe() again (for debugging purposes and to check I got the topology right), had the following exception and stacktrace: {{Caused by: java.util.NoSuchElementException: null}} {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) ~[na:1.8.0_171]}} {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) ~[na:1.8.0_171]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) ~[kafka-streams-1.1.0.jar:na]}} Snipped of code that caused this: {{ GlobalKTable jsonRoutesToServices}} {{ = builder.globalTable("routes-to-services",}} {{ Consumed.with(Serdes.String(), jsonServiceListSerde),}} {{ Materialized.>as("routes-to-services")); TopologyDescription td = builder.build().describe();}} {{ String parent = null;}} {{ // We get an iterator to a TreeSet sorted by processing order, and just want the last one.}} {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} {{ parent = store.processor().name();}} } {{ TopologyDescription tdtd = builder.build().describe();}} {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new UnneededCruftSupplier(), parent);}} {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", "fst-routes-to-services", Serdes.String().serializer(), fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} {{ TopologyDescription tdtdtd = builder.build().describe();}} Note that the exception is thrown on the last line of the code snippet, calling describe again before adding anything works fine. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner,
[jira] [Updated] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jouni updated KAFKA-7125: - Description: After adding a a processor and a sink to topology after a globalstore and then calling StreamBuilder.build().describe() again (for debugging purposes and to check I got the topology right), had the following exception and stacktrace: {{Caused by: java.util.NoSuchElementException: null}} {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) ~[na:1.8.0_171]}} {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) ~[na:1.8.0_171]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) ~[kafka-streams-1.1.0.jar:na]}} Snipped of code that caused this: {{ GlobalKTable jsonRoutesToServices}} {{ = builder.globalTable("routes-to-services",}} {{ Consumed.with(Serdes.String(), jsonServiceListSerde),}} {{ Materialized.>as("routes-to-services")); TopologyDescription td = builder.build().describe();}} {{ String parent = null;}} {{ // We get an iterator to a TreeSet sorted by processing order, and just want the last one.}} {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} {{ parent = store.processor().name();}} } {{ TopologyDescription tdtd = builder.build().describe();}} {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new UnneededCruftSupplier(), parent);}} {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", "fst-routes-to-services", Serdes.String().serializer(), fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} {{ TopologyDescription tdtdtd = builder.build().describe();}} Note that the exception is thrown on the last line of the code snippet, calling describe again before adding anything works fine. was: After adding a a processor and a sink to topology after a globalstore and then calling StreamBuilder.build().describe() again (for debugging purposes and to check I got the topology right), had the following exception and stacktrace: {{Caused by: java.util.NoSuchElementException: null}} {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) ~[na:1.8.0_171]}} {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) ~[na:1.8.0_171]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) ~[kafka-streams-1.1.0.jar:na]}} {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) ~[kafka-streams-1.1.0.jar:na]}} Snipped of code that caused this: {{ TopologyDescription td = builder.build().describe();}} {{ String parent = null;}} {{ // We get an iterator to a TreeSet sorted by processing order, and just want the last one.}} {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} {{ parent = store.processor().name();}} } {{ TopologyDescription tdtd = builder.build().describe();}} {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new UnneededCruftSupplier(), parent);}} {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", "fst-routes-to-services", Serdes.String().serializer(), fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} {{ TopologyDescription tdtdtd = builder.build().describe();}} Note that the exception is thrown on the last line of the code snippet, calling describe again before adding anything works fine. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused
[jira] [Commented] (KAFKA-7118) Make KafkaConsumer compatible with multiple threads
[ https://issues.apache.org/jira/browse/KAFKA-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529467#comment-16529467 ] Richard Yu commented on KAFKA-7118: --- I am not too sure if this is applicable. Issue might not be necessary. > Make KafkaConsumer compatible with multiple threads > --- > > Key: KAFKA-7118 > URL: https://issues.apache.org/jira/browse/KAFKA-7118 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Major > Labels: client, java, kip, needs-kip > > It was discovered that there is a performance constraint relating to > {{KafkaConsumer#close()}}. When several cores are modifying the same > consumer, a {{ConcurrentModification}} exception could result. This issue was > first posted on Spark. For testing details, see SPARK-23636 and SPARK-19185. > It would be preferable if multiple threads are allowed to modify one > {{KafkaConsumer}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7127: -- Description: Currently, in KafkaConsumer, various methods blocks due to a remote callback. It would be nice if we also added asynchronous version of these methods, like what was done with {{commitAsync()}} which uses a {{OffsetCommitCallback}} as an input argument. (was: Currently, in KafkaConsumer, various methods blocks due to a remote callback. It would be nice if we also added asynchronous version of these methods, like what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a {{OffsetCommitCallback}} as an input argument.) > Add asynchronous support for methods in KafkaConsumer > - > > Key: KAFKA-7127 > URL: https://issues.apache.org/jira/browse/KAFKA-7127 > Project: Kafka > Issue Type: Wish > Components: clients >Reporter: Richard Yu >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, in KafkaConsumer, various methods blocks due to a remote callback. > It would be nice if we also added asynchronous version of these methods, like > what was done with {{commitAsync()}} which uses a {{OffsetCommitCallback}} > as an input argument. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529463#comment-16529463 ] Richard Yu commented on KAFKA-7127: --- cc [~hachikuji] > Add asynchronous support for methods in KafkaConsumer > - > > Key: KAFKA-7127 > URL: https://issues.apache.org/jira/browse/KAFKA-7127 > Project: Kafka > Issue Type: Wish > Components: clients >Reporter: Richard Yu >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, in KafkaConsumer, various methods blocks due to a remote callback. > It would be nice if we also added asynchronous version of these methods, like > what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a > {{OffsetCommitCallback}} as an input argument. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer
Richard Yu created KAFKA-7127: - Summary: Add asynchronous support for methods in KafkaConsumer Key: KAFKA-7127 URL: https://issues.apache.org/jira/browse/KAFKA-7127 Project: Kafka Issue Type: Wish Components: clients Reporter: Richard Yu Currently, in KafkaConsumer, various methods blocks due to a remote callback. It would be nice if we also added asynchronous version of these methods, like what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a {{OffsetCommitCallback}} as an input argument. -- This message was sent by Atlassian JIRA (v7.6.3#76005)