[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533252#comment-16533252 ] Richard Yu commented on KAFKA-7132: --- Hi all, You could find the KIP here. (Discussion thread TBD) [https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing-RejectedAlternatives] > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533252#comment-16533252 ] Richard Yu edited comment on KAFKA-7132 at 7/5/18 4:30 AM: --- Hi all, You could find the KIP here. (Discussion thread TBD) [https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing-RejectedAlternatives] was (Author: yohan123): Hi all, You could find the KIP here. (Discussion thread TBD) [https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing-RejectedAlternatives] > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7132: -- Comment: was deleted (was: Well, when I was imagining this situation, I was thinking that the log's maximum extent is at offset 70.) > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533162#comment-16533162 ] Richard Yu commented on KAFKA-7132: --- Well, when I was imagining this situation, I was thinking that the log's maximum extent is at offset 70. > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6904) DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate is flaky
[ https://issues.apache.org/jira/browse/KAFKA-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533150#comment-16533150 ] Ted Yu commented on KAFKA-6904: --- Haven't seen this failure lately. > DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate is flaky > -- > > Key: KAFKA-6904 > URL: https://issues.apache.org/jira/browse/KAFKA-6904 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Major > > From > https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/820/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testAdvertisedListenerUpdate/ > : > {code} > kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate > Failing for the past 1 build (Since Failed#820 ) > Took 21 sec. > Error Message > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition. > Stacktrace > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition. > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) > at > kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:996) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) > at scala.collection.Iterator.foreach(Iterator.scala:944) > at scala.collection.Iterator.foreach$(Iterator.scala:944) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1432) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:234) > at scala.collection.TraversableLike.map$(TraversableLike.scala:227) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:996) > at > kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate(DynamicBrokerReconfigurationTest.scala:742) > {code} > The above happened with jdk 10. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533062#comment-16533062 ] Guozhang Wang commented on KAFKA-7125: -- Thanks [~Ahto]. Regarding the documentation, with the `reduce` call if you only have one update for the key, although the reducer function will not be called, the table should still be updated with the first record's value directly, and hence be appropriate for converting a stream into a table. So it is actually weird that you observe "my table is empty until I fed the data twice".. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment
[ https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533035#comment-16533035 ] Dhruvil Shah commented on KAFKA-7130: - [~kschnitter] it would be useful if you could save the affected partition (or at least the segment) while we investigate this. As a first step, could you please share information about all segments for this partition, including the size of each of them? `ls -l` output of the directory should be sufficient. Would you be willing to share metadata dump of the affected segment? This will tell us what the segment state is like. You could use `bin/kafka-run-class.sh kafka.tools.DumpLogSegments`. > EOFException after rolling log segment > -- > > Key: KAFKA-7130 > URL: https://issues.apache.org/jira/browse/KAFKA-7130 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.1.0 >Reporter: Karsten Schnitter >Priority: Major > > When rolling a log segment one of our Kafka cluster got an immediate read > error on the same partition. This lead to a flood of log messages containing > the corresponding stacktraces. Data was still appended to the partition but > consumers were unable to read from that partition. Reason for the exception > is unclear. > {noformat} > [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, > dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 > ms. (kafka.log.Log) > [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] > Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager) > [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, > dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 > ms. (kafka.log.Log) > [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing > fetch operation on partition ingestion-3, offset 971865977 > (kafka.server.ReplicaManager) > Caused by: java.io.EOFException: Failed to read `log header` from file > channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but > reached end of file after reading 0 bytes. Started read from position > 2147483643. > {noformat} > We mitigated the issue by stopping the affected node and deleting the > corresponding directory. Once the partition was recreated for the replica (we > use replication-factor 2) the other replica experienced the same problem. We > mitigated likewise. > To us it is unclear, what caused this issue. Can you help us in finding the > root cause of this problem? > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533017#comment-16533017 ] Jouni commented on KAFKA-7125: -- Thanks Guozhang. Process method not being called, could be because I'm somewhat a beginner with Kafka, messing up everything all the time, and reverted to streams 1.1.0. I'll switch back to current 2.1.0-SNAPSHOT + your PR, so I can debug, put some breakpoints here and there, see what's really happening and double-check myself, so I don't have to bother you with unnecessary questions. It's already late evening here in Finland (GMT+3), so I'm going to sleep after a while, but will use the whole next day for tracking this down. Besides, I've already converted 2 out of 3 my to globaltables to streams aggregated to tables, and learned a lot of interesting things about when I really have to use Consumed, Produced, Materialized, Joined etc. and when it just works with the defaults. Also wrote a custom partitioner... BTW, if you happen to be connected to Confluent, I think the Option 2. example in [https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step] should be changed. I'd recommend something like {{.groupByKey()}} {{.aggregate(() -> null, (foo, value, bar) -> value, Materialized...}} It took me a while to find out that reduce will NOT call method apply the first time, only on the second time, wondering why my table is empty until I fed the data twice. This was with 1.1.0, things could be otherwise with different versions. Being able add just one processor is enough to solve my use case for doing some non-streams-related per-instance local processing. More than one processor per globaltable should never be needed. But another possibility, maybe safer considering Kafka itself, would be to prevent it altogether from users and instead add something like withChangeStream and a way to subscribe that stream. Although it still wouldn't prevent us users doing dumb things. I don't mind whatever the solution is/will be and can adapt, as long as this use case is covered. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532997#comment-16532997 ] Guozhang Wang commented on KAFKA-6127: -- That is a good question. I think we should consider whether it worth to add more configs in Streams post KIP-266. Our current situation is: 1. we do have {{RETRIES_CONFIG}} and {{RETRY_BACKOFF_MS_CONFIG}} config in StreamsConfig, but today it is only used in global consumer's {{globalConsumer.endOffsets(topicPartitions);}} and {{partitionInfos = globalConsumer.partitionsFor(sourceTopic);}} because we always try to complete the restoration of global stores before starting any stream threads today. 2. we do not have anything like {{MAX_BLOCK_MS}} config, and we hard code different values today for some of the callers, and for some other calls we do not provide the timeout and hence relying on the consumer's request timeout value as default, and that value is {{40 * 1000}} by default. The question for 2) is, whether it's better to define a global config and use that across all blocking calls to consumer; on the other hand, if for other callers if we should pass in specific timeout than just relying on request timeout. The question for 1) is, whether we can just use a very large timeout value, and get rid of retries then? > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null
[ https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532992#comment-16532992 ] Guozhang Wang commented on KAFKA-7125: -- [~Ahto] I think with my PR, adding just a processor, not followed by a sink node should be fine now (as I mentioned earlier we should have another PR for disallowing 1) adding a local state store to it, 2) adding a sink node to it, and 3) claiming any of its processor nodes as parent of the processor nodes of the other normal processing sub-topology, but that should not be impacting your use case for now). I'm not sure what do you mean by "my processors process method isn't called", and I've double checked the update processor's impl, {{KTableSource}} source code again, if there is indeed a children processor added, it should be eventually forwarding the records to it, but note that if caching is enabled, it may not likely flush and forward the records to its children immediately, but until the processing state is committed. > Calling StreamsBuilderbuilder.build().describe() causes > java.util.NoSuchElementException: null > -- > > Key: KAFKA-7125 > URL: https://issues.apache.org/jira/browse/KAFKA-7125 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Jouni >Assignee: Nikolay Izhikov >Priority: Minor > Labels: beginner, newbie > > After adding a a processor and a sink to topology after a globalstore and > then calling StreamBuilder.build().describe() again (for debugging purposes > and to check I got the topology right), had the following exception and > stacktrace: > {{Caused by: java.util.NoSuchElementException: null}} > {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) > ~[na:1.8.0_171]}} > {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466) > ~[na:1.8.0_171]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306) > ~[kafka-streams-1.1.0.jar:na]}} > {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647) > ~[kafka-streams-1.1.0.jar:na]}} > Snipped of code that caused this: > {{ GlobalKTable jsonRoutesToServices}} > {{ = builder.globalTable("routes-to-services",}} > {{ Consumed.with(Serdes.String(), > jsonServiceListSerde),}} > {{ Materialized. KeyValueStore>as("routes-to-services"));}} > {{ TopologyDescription td = builder.build().describe();}} > {{ String parent = null;}} > {{ // We get an iterator to a TreeSet sorted by processing order, and > just want the last one.}} > {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}} > {{ parent = store.processor().name();}} > } > {{ TopologyDescription tdtd = builder.build().describe();}} > {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new > UnneededCruftSupplier(), parent);}} > {{ builder.build().addSink("FST-ROUTES-TO-SERVICES", > "fst-routes-to-services", Serdes.String().serializer(), > fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}} > {{ TopologyDescription tdtdtd = builder.build().describe();}} > Note that the exception is thrown on the last line of the code snippet, > calling describe again before adding anything works fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532980#comment-16532980 ] Guozhang Wang commented on KAFKA-7132: -- [~Yohan123] There are two things we should consider here, [~enether] has mentioned one, that is to guarantee offset ordering for consumption. Another thing is to guarantee at-least-once semantics by default. Resuming from the last committed offset would likely introduce duplicated records to be processed, but would also avoid data loss. Restarting from the latest offset (I'm not sure what do you mean by "it recovers at a later offset", so I'd assume you meant to say when consumer resumes, the log has grown to offset 120) would cause you to lose the data from 100 - 120, while using a separate consumer to cover the gap would violate ordering guarantees. > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7133) DisconnectException every 5 minutes in single restore consumer thread
Chris Schwarzfischer created KAFKA-7133: --- Summary: DisconnectException every 5 minutes in single restore consumer thread Key: KAFKA-7133 URL: https://issues.apache.org/jira/browse/KAFKA-7133 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Environment: Kafka Streams application in Kubernetes. Kafka Server in Docker on machine in host mode Reporter: Chris Schwarzfischer One of our streams applications (and only this one) gets a {{org.apache.kafka.common.errors.DisconnectException}} almost exactly every 5 minutes. The application has two of KStream -> KGroupedStream -> KTable -> KGroupedTable -> KTable aggregations. Relevant config is in Streams: {code:java} this.properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE); //... this.properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); this.properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 1024 * 500 /* 500 MB */ ); this.properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024 * 100 /* 100 MB */); this.properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 50 /* 50 MB */); {code} On the broker: {noformat} KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_OFFSETS_RETENTION_MINUTES: 108000 KAFKA_MIN_INSYNC_REPLICAS: 2 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 2147483000 KAFKA_LOG_RETENTION_HOURS: 2688 KAFKA_OFFSETS_RETENTION_CHECK_INTERVAL_MS: 120 KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 12000 {noformat} Logging gives us a single restore consumer thread that throws exceptions every 5 mins: {noformat} July 4th 2018, 15:38:51.560 dockertest032018-07-04T13:38:51,559Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer, groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) to node 1: org.apache.kafka.common.errors.DisconnectException. July 4th 2018, 15:37:54.833 dockertest032018-07-04T13:37:54,832Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer, groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. July 4th 2018, 15:37:54.833 dockertest032018-07-04T13:37:54,832Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer, groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException. July 4th 2018, 15:32:26.379 dockertest032018-07-04T13:32:26,378Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer, groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) to node 1: org.apache.kafka.common.errors.DisconnectException. July 4th 2018, 15:32:01.926 dockertest032018-07-04T13:32:01,925Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer, groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException. July 4th 2018, 15:32:01.926 dockertest032018-07-04T13:32:01,925Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer, groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. July 4th 2018, 15:26:53.886 dockertest032018-07-04T13:26:53,886Z INFO : [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]: FetchSessionHandler::handleError:440 - [Consumer
[jira] [Commented] (KAFKA-6751) Make max.connections.per.ip.overrides a dynamic config
[ https://issues.apache.org/jira/browse/KAFKA-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532587#comment-16532587 ] ASF GitHub Bot commented on KAFKA-6751: --- omkreddy opened a new pull request #5334: KAFKA-6751: Support dynamic configuration of max.connections.per.ip/max.connections.per.ip.overrides configs (KIP-308) URL: https://github.com/apache/kafka/pull/5334 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make max.connections.per.ip.overrides a dynamic config > -- > > Key: KAFKA-6751 > URL: https://issues.apache.org/jira/browse/KAFKA-6751 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Manikumar >Priority: Major > Labels: needs-kip > Fix For: 2.1.0 > > > It might be useful to be able to update this config dynamically since we > occasionally run into situations where a particular host (or set of hosts) is > causing some trouble for the broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532545#comment-16532545 ] Stanislav Kozlovski commented on KAFKA-7132: The best way to consider this is to open a KIP and pass it to the mailing group for thorough discussion. This is a good way to avoid lag but unfortunately will mess up every ordering guarantee. > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7132: -- Labels: performance (was: ) > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7132: -- Priority: Critical (was: Major) > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Critical > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7132: -- Summary: Consider adding faster form of rebalancing (was: Consider adding multithreaded form of rebalancing) > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Major > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7132) Consider adding multithreaded form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7132: -- Description: Currently, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data. To avoid this from happening, one option would be to allow the current consumer to start processing not from the last checkpointed offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start reading from offset 70 in concurrency with the old process, and will be terminated once it reaches 120. In this manner, a considerable amount of lag can be avoided, particularly since the old consumer could proceed as if nothing had happened. (was: Currently, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because it restarted at 70, forcing it to reprocess old data. To avoid this from happening, one option would be to allow the current consumer to start processing not from the last checkpointed offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start reading from offset 70 in concurrency with the old process, and will be terminated once it reaches 120. In this manner, a considerable amount of lag can be avoided, particularly since the old consumer could proceed as if nothing had happened. ) > Consider adding multithreaded form of rebalancing > - > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Major > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7132) Consider adding multithreaded form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-7132: -- Summary: Consider adding multithreaded form of rebalancing (was: Consider adding multithreaded form of recovery) > Consider adding multithreaded form of rebalancing > - > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Priority: Major > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because it restarted at 70, > forcing it to reprocess old data. To avoid this from happening, one option > would be to allow the current consumer to start processing not from the last > checkpointed offset (which is 70 in the example), but from 120 where it > recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7132) Consider adding multithreaded form of recovery
Richard Yu created KAFKA-7132: - Summary: Consider adding multithreaded form of recovery Key: KAFKA-7132 URL: https://issues.apache.org/jira/browse/KAFKA-7132 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Richard Yu Currently, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because it restarted at 70, forcing it to reprocess old data. To avoid this from happening, one option would be to allow the current consumer to start processing not from the last checkpointed offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start reading from offset 70 in concurrency with the old process, and will be terminated once it reaches 120. In this manner, a considerable amount of lag can be avoided, particularly since the old consumer could proceed as if nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment
[ https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532391#comment-16532391 ] Karsten Schnitter commented on KAFKA-7130: -- I saved the faulty segments from the last incident. But they contain sensitive data, that I cannot share outside my company. I tried to read the last message of the finished segment from the partition: {noformat} $ kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVERS --topic ingestion --partition 67 --offset 1316626420 --max-messages 1 [2018-07-04 07:33:11,604] WARN [Consumer clientId=consumer-1, groupId=console-consumer-79022] Unknown error fetching data for topic-partition ingestion-67 (org.apache.kafka.clients.consumer.internals.Fetcher) ... ^C[2018-07-04 07:33:11,606] WARN [Consumer clientId=consumer-1, groupId=console-consumer-79022] Unknown error fetching data for topic-partition ingestion-67 (org.apache.kafka.clients.consumer.internals.Fetcher) [2018-07-04 07:33:11,612] WARN [Consumer clientId=consumer-1, groupId=console-consumer-79022] Unknown error fetching data for topic-partition ingestion-67 (org.apache.kafka.clients.consumer.internals.Fetcher) Processed a total of 0 messages {noformat} The error message is repeated until I stop the command. It is the same for other messages in the segment. Consuming the first message of the new segment works fine: {noformat} $ kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVERS --topic ingestion --partition 67 --offset 1316626421 --max-messages 1 Processed a total of 1 messages {noformat} > EOFException after rolling log segment > -- > > Key: KAFKA-7130 > URL: https://issues.apache.org/jira/browse/KAFKA-7130 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.1.0 >Reporter: Karsten Schnitter >Priority: Major > > When rolling a log segment one of our Kafka cluster got an immediate read > error on the same partition. This lead to a flood of log messages containing > the corresponding stacktraces. Data was still appended to the partition but > consumers were unable to read from that partition. Reason for the exception > is unclear. > {noformat} > [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, > dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 > ms. (kafka.log.Log) > [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] > Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager) > [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, > dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 > ms. (kafka.log.Log) > [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing > fetch operation on partition ingestion-3, offset 971865977 > (kafka.server.ReplicaManager) > Caused by: java.io.EOFException: Failed to read `log header` from file > channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but > reached end of file after reading 0 bytes. Started read from position > 2147483643. > {noformat} > We mitigated the issue by stopping the affected node and deleting the > corresponding directory. Once the partition was recreated for the replica (we > use replication-factor 2) the other replica experienced the same problem. We > mitigated likewise. > To us it is unclear, what caused this issue. Can you help us in finding the > root cause of this problem? > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment
[ https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532366#comment-16532366 ] Karsten Schnitter commented on KAFKA-7130: -- The stack trace of the original system was lost. Fortunately for this issue, but unfortunately for us, the issue reoccured on a different cluster. I extracted the stack trace from there: {noformat} [2018-07-04 07:10:53,633] ERROR [ReplicaManager broker=4] Error processing fetch operation on partition ingestion-67, offset 1316626420 (kafka.server.ReplicaManager) org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read `log header` from file channel `sun.nio.ch.FileChannelImpl@44f0380f`. Expected to read 17 bytes, but reached end of file after reading 0 bytes. Started read from position 2147483638. at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) at org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:287) at kafka.log.LogSegment.translateOffset(LogSegment.scala:190) at kafka.log.LogSegment.read(LogSegment.scala:242) at kafka.log.Log.$anonfun$read$2(Log.scala:1020) at kafka.log.Log.maybeHandleIOException(Log.scala:1678) at kafka.log.Log.read(Log.scala:976) at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:920) at kafka.server.ReplicaManager.$anonfun$readFromLocalLog$6(ReplicaManager.scala:982) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:981) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:818) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:823) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:631) at kafka.server.KafkaApis.handle(KafkaApis.scala:105) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException: Failed to read `log header` from file channel `sun.nio.ch.FileChannelImpl@44f0380f`. Expected to read 17 bytes, but reached end of file after reading 0 bytes. Started read from position 2147483638. at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:806) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:66) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) ... 21 more {noformat} > EOFException after rolling log segment > -- > > Key: KAFKA-7130 > URL: https://issues.apache.org/jira/browse/KAFKA-7130 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.1.0 >Reporter: Karsten Schnitter >Priority: Major > > When rolling a log segment one of our Kafka cluster got an immediate read > error on the same partition. This lead to a flood of log messages containing > the corresponding stacktraces. Data was still appended to the partition but > consumers were unable to read from that partition. Reason for the exception > is unclear. > {noformat} > [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, > dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 > ms. (kafka.log.Log) > [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] > Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager) > [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, > dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 > ms. (kafka.log.Log) > [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing > fetch operation on partition ingestion-3, offset 971865977 > (kafka.server.ReplicaManager) > Caused by: java.io.EOFException: Failed to read `log header` from file > channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but > reached end of file after reading 0 bytes. Started read from position > 2147483643. > {noformat} > We mitigated the issue by stopping the affected node and deleting the > corresponding directory. Once the partition was
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532315#comment-16532315 ] ASF GitHub Bot commented on KAFKA-6127: --- ConcurrencyPractitioner opened a new pull request #5333: [KAFKA-6127] Streams should never block infinitely URL: https://github.com/apache/kafka/pull/5333 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532301#comment-16532301 ] Richard Yu commented on KAFKA-6127: --- Hi [~guozhang] Now that we have created new methods (for {{KafkaConsumer}}) with bounded times, don't we also require some new configuration (determined by the user?) to determine how long we block? This change might require a KIP too. With the new {{Consumer}} API, we should also migrate {{KafkaStreams}} as well. > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-6127: - Assignee: Richard Yu > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > Thanks to > [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],] > the Consumer now has non-blocking variants that we can use, but the same is > not true of Producer. We can add non-blocking variants to Producer as well, > or set the appropriate config options to set the max timeout. > Of course, we'd also need to be sure the catch the appropriate timeout > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)