[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533252#comment-16533252
 ] 

Richard Yu commented on KAFKA-7132:
---

Hi all,

You could find the KIP here. (Discussion thread TBD)

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing-RejectedAlternatives]

 

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533252#comment-16533252
 ] 

Richard Yu edited comment on KAFKA-7132 at 7/5/18 4:30 AM:
---

Hi all,

You could find the KIP here. (Discussion thread TBD)

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing-RejectedAlternatives]

 


was (Author: yohan123):
Hi all,

You could find the KIP here. (Discussion thread TBD)

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing#KIP-333:Addfastermodeofrebalancing-RejectedAlternatives]

 

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7132:
--
Comment: was deleted

(was: Well, when I was imagining this situation, I was thinking that the log's 
maximum extent is at offset 70.)

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533162#comment-16533162
 ] 

Richard Yu commented on KAFKA-7132:
---

Well, when I was imagining this situation, I was thinking that the log's 
maximum extent is at offset 70.

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6904) DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate is flaky

2018-07-04 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533150#comment-16533150
 ] 

Ted Yu commented on KAFKA-6904:
---

Haven't seen this failure lately.

> DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate is flaky
> --
>
> Key: KAFKA-6904
> URL: https://issues.apache.org/jira/browse/KAFKA-6904
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Major
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/820/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testAdvertisedListenerUpdate/
>  :
> {code}
> kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate
> Failing for the past 1 build (Since Failed#820 )
> Took 21 sec.
> Error Message
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> Stacktrace
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:996)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
>   at scala.collection.Iterator.foreach(Iterator.scala:944)
>   at scala.collection.Iterator.foreach$(Iterator.scala:944)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:234)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:996)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate(DynamicBrokerReconfigurationTest.scala:742)
> {code}
> The above happened with jdk 10.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533062#comment-16533062
 ] 

Guozhang Wang commented on KAFKA-7125:
--

Thanks [~Ahto].

Regarding the documentation, with the `reduce` call if you only have one update 
for the key, although the reducer function will not be called, the table should 
still be updated with the first record's value directly, and hence be 
appropriate for converting a stream into a table. So it is actually weird that 
you observe "my table is empty until I fed the data twice"..

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment

2018-07-04 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533035#comment-16533035
 ] 

Dhruvil Shah commented on KAFKA-7130:
-

[~kschnitter] it would be useful if you could save the affected partition (or 
at least the segment) while we investigate this.

As a first step, could you please share information about all segments for this 
partition, including the size of each of them? `ls -l` output of the directory 
should be sufficient.

Would you be willing to share metadata dump of the affected segment? This will 
tell us what the segment state is like. You could use `bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments`.

> EOFException after rolling log segment
> --
>
> Key: KAFKA-7130
> URL: https://issues.apache.org/jira/browse/KAFKA-7130
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.0
>Reporter: Karsten Schnitter
>Priority: Major
>
> When rolling a log segment one of our Kafka cluster got an immediate read 
> error on the same partition. This lead to a flood of log messages containing 
> the corresponding stacktraces. Data was still appended to the partition but 
> consumers were unable to read from that partition. Reason for the exception 
> is unclear.
> {noformat}
> [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] 
> Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager)
> [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing 
> fetch operation on partition ingestion-3, offset 971865977 
> (kafka.server.ReplicaManager)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> {noformat}
> We mitigated the issue by stopping the affected node and deleting the 
> corresponding directory. Once the partition was recreated for the replica (we 
> use replication-factor 2) the other replica experienced the same problem. We 
> mitigated likewise.
> To us it is unclear, what caused this issue. Can you help us in finding the 
> root cause of this problem?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-04 Thread Jouni (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533017#comment-16533017
 ] 

Jouni commented on KAFKA-7125:
--

Thanks Guozhang. Process method not being called, could be because I'm somewhat 
a beginner with Kafka, messing up everything all the time, and reverted to 
streams 1.1.0. I'll switch back to current 2.1.0-SNAPSHOT + your PR, so I can 
debug, put some breakpoints here and there, see what's really happening and 
double-check myself, so I don't have to bother you with unnecessary questions.

It's already late evening here in Finland (GMT+3), so I'm going to sleep after 
a while, but will use the whole next day for tracking this down.

Besides, I've already converted 2 out of 3 my to globaltables to streams 
aggregated to tables, and learned a lot of interesting things about when I 
really have to use Consumed, Produced, Materialized, Joined etc. and when it 
just works with the defaults. Also wrote a custom partitioner...

BTW, if you happen to be connected to Confluent, I think the Option 2. example 
in 
[https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step]
 should be changed. I'd recommend something like

{{.groupByKey()}}
{{.aggregate(() -> null, (foo, value, bar) -> value, Materialized...}}

It took me a while to find out that reduce will NOT call method apply the first 
time, only on the second time, wondering why my table is empty until I fed the 
data twice. This was with 1.1.0, things could be otherwise with different 
versions.

Being able add just one processor is enough to solve my use case for doing some 
non-streams-related per-instance local processing. More than one processor per 
globaltable should never be needed. But another possibility, maybe safer 
considering Kafka itself, would be to prevent it altogether from users and 
instead add something like withChangeStream and a way to subscribe that stream. 
Although it still wouldn't prevent us users doing dumb things. I don't mind 
whatever the solution is/will be and can adapt, as long as this use case is 
covered.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was 

[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2018-07-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532997#comment-16532997
 ] 

Guozhang Wang commented on KAFKA-6127:
--

That is a good question. I think we should consider whether it worth to add 
more configs in Streams post KIP-266. Our current situation is:

1. we do have {{RETRIES_CONFIG}} and {{RETRY_BACKOFF_MS_CONFIG}} config in 
StreamsConfig, but today it is only used in global consumer's 
{{globalConsumer.endOffsets(topicPartitions);}} and {{partitionInfos = 
globalConsumer.partitionsFor(sourceTopic);}} because we always try to complete 
the restoration of global stores before starting any stream threads today.

2. we do not have anything like {{MAX_BLOCK_MS}} config, and we hard code 
different values today for some of the callers, and for some other calls we do 
not provide the timeout and hence relying on the consumer's request timeout 
value as default, and that value is {{40 * 1000}} by default.

The question for 2) is, whether it's better to define a global config and use 
that across all blocking calls to consumer; on the other hand, if for other 
callers if we should pass in specific timeout than just relying on request 
timeout.

The question for 1) is, whether we can just use a very large timeout value, and 
get rid of retries then?

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7125) Calling StreamsBuilderbuilder.build().describe() causes java.util.NoSuchElementException: null

2018-07-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532992#comment-16532992
 ] 

Guozhang Wang commented on KAFKA-7125:
--

[~Ahto] I think with my PR, adding just a processor, not followed by a sink 
node should be fine now (as I mentioned earlier we should have another PR for 
disallowing 1) adding a local state store to it, 2) adding a sink node to it, 
and 3) claiming any of its processor nodes as parent of the processor nodes of 
the other normal processing sub-topology, but that should not be impacting your 
use case for now).

I'm not sure what do you mean by "my processors process method isn't called", 
and I've double checked the update processor's impl, {{KTableSource}} source 
code again, if there is indeed a children processor added, it should be 
eventually forwarding the records to it, but note that if caching is enabled, 
it may not likely flush and forward the records to its children immediately, 
but until the processing state is committed.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> --
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Jouni
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{    GlobalKTable jsonRoutesToServices}}
>  {{    = builder.globalTable("routes-to-services",}}
>  {{    Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{    Materialized. KeyValueStore>as("routes-to-services"));}}
> {{    TopologyDescription td = builder.build().describe();}}
>  {{    String parent = null;}}
>  {{    // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{    for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{    parent = store.processor().name();}}
>      }
>  {{    TopologyDescription tdtd = builder.build().describe();}}
>  {{    builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{    builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{    TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532980#comment-16532980
 ] 

Guozhang Wang commented on KAFKA-7132:
--

[~Yohan123] There are two things we should consider here, [~enether] has 
mentioned one, that is to guarantee offset ordering for consumption. Another 
thing is to guarantee at-least-once semantics by default. Resuming from the 
last committed offset would likely introduce duplicated records to be 
processed, but would also avoid data loss. Restarting from the latest offset 
(I'm not sure what do you mean by "it recovers at a later offset", so I'd 
assume you meant to say when consumer resumes, the log has grown to offset 120) 
would cause you to lose the data from 100 - 120, while using a separate 
consumer to cover the gap would violate ordering guarantees.

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7133) DisconnectException every 5 minutes in single restore consumer thread

2018-07-04 Thread Chris Schwarzfischer (JIRA)
Chris Schwarzfischer created KAFKA-7133:
---

 Summary: DisconnectException every 5 minutes in single restore 
consumer thread
 Key: KAFKA-7133
 URL: https://issues.apache.org/jira/browse/KAFKA-7133
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
 Environment: Kafka Streams application in Kubernetes.
Kafka Server in Docker on machine in host mode
Reporter: Chris Schwarzfischer


One of our streams applications (and only this one) gets a 
{{org.apache.kafka.common.errors.DisconnectException}} almost exactly every 5 
minutes.
The application has two of
KStream -> KGroupedStream -> KTable -> KGroupedTable -> KTable
aggregations.

Relevant config is in Streams:
{code:java}
this.properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.AT_LEAST_ONCE);
//...
this.properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
this.properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 1024 
* 500 /* 500 MB */ );
this.properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 
1024 * 100 /* 100 MB */);
this.properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 50 /* 
50 MB */);
{code}

On the broker:
{noformat}
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_RETENTION_MINUTES: 108000
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 2147483000
KAFKA_LOG_RETENTION_HOURS: 2688
KAFKA_OFFSETS_RETENTION_CHECK_INTERVAL_MS: 120
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 12000
{noformat}

Logging gives us a single restore consumer thread that throws exceptions every 
5 mins:
 
{noformat}
July 4th 2018, 15:38:51.560 dockertest032018-07-04T13:38:51,559Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 
clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
 groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) to 
node 1: org.apache.kafka.common.errors.DisconnectException.
July 4th 2018, 15:37:54.833 dockertest032018-07-04T13:37:54,832Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 
clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
 groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) to 
node 3: org.apache.kafka.common.errors.DisconnectException.
July 4th 2018, 15:37:54.833 dockertest032018-07-04T13:37:54,832Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 
clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
 groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) to 
node 2: org.apache.kafka.common.errors.DisconnectException.
July 4th 2018, 15:32:26.379 dockertest032018-07-04T13:32:26,378Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 
clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
 groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) to 
node 1: org.apache.kafka.common.errors.DisconnectException.
July 4th 2018, 15:32:01.926 dockertest032018-07-04T13:32:01,925Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 
clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
 groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) to 
node 2: org.apache.kafka.common.errors.DisconnectException.
July 4th 2018, 15:32:01.926 dockertest032018-07-04T13:32:01,925Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 
clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
 groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) to 
node 3: org.apache.kafka.common.errors.DisconnectException.
July 4th 2018, 15:26:53.886 dockertest032018-07-04T13:26:53,886Z INFO  
: 
[testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
 FetchSessionHandler::handleError:440 - [Consumer 

[jira] [Commented] (KAFKA-6751) Make max.connections.per.ip.overrides a dynamic config

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532587#comment-16532587
 ] 

ASF GitHub Bot commented on KAFKA-6751:
---

omkreddy opened a new pull request #5334: KAFKA-6751: Support dynamic 
configuration of max.connections.per.ip/max.connections.per.ip.overrides 
configs (KIP-308)
URL: https://github.com/apache/kafka/pull/5334
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make max.connections.per.ip.overrides a dynamic config
> --
>
> Key: KAFKA-6751
> URL: https://issues.apache.org/jira/browse/KAFKA-6751
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> It might be useful to be able to update this config dynamically since we 
> occasionally run into situations where a particular host (or set of hosts) is 
> causing some trouble for the broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532545#comment-16532545
 ] 

Stanislav Kozlovski commented on KAFKA-7132:


The best way to consider this is to open a KIP and pass it to the mailing group 
for thorough discussion.
This is a good way to avoid lag but unfortunately will mess up every ordering 
guarantee.

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7132:
--
Labels: performance  (was: )

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7132:
--
Priority: Critical  (was: Major)

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Critical
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7132:
--
Summary: Consider adding faster form of rebalancing  (was: Consider adding 
multithreaded form of rebalancing)

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7132) Consider adding multithreaded form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7132:
--
Description: Currently, when a consumer falls out of a consumer group, it 
will restart processing from the last checkpointed offset. However, this design 
could result in a lag which some users could not afford to let happen. For 
example, lets say a consumer crashed at offset 100, with the last checkpointed 
offset being at 70. When it recovers at a later offset (say, 120), it will be 
behind by an offset range of 50 (120 - 70). This is because the consumer 
restarted at 70, forcing it to reprocess old data. To avoid this from 
happening, one option would be to allow the current consumer to start 
processing not from the last checkpointed offset (which is 70 in the example), 
but from 120 where it recovers. Meanwhile, a new KafkaConsumer will be 
instantiated and start reading from offset 70 in concurrency with the old 
process, and will be terminated once it reaches 120. In this manner, a 
considerable amount of lag can be avoided, particularly since the old consumer 
could proceed as if nothing had happened.   (was: Currently, when a consumer 
falls out of a consumer group, it will restart processing from the last 
checkpointed offset. However, this design could result in a lag which some 
users could not afford to let happen. For example, lets say a consumer crashed 
at offset 100, with the last checkpointed offset being at 70. When it recovers 
at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 
70). This is because it restarted at 70, forcing it to reprocess old data. To 
avoid this from happening, one option would be to allow the current consumer to 
start processing not from the last checkpointed offset (which is 70 in the 
example), but from 120 where it recovers. Meanwhile, a new KafkaConsumer will 
be instantiated and start reading from offset 70 in concurrency with the old 
process, and will be terminated once it reaches 120. In this manner, a 
considerable amount of lag can be avoided, particularly since the old consumer 
could proceed as if nothing had happened. )

> Consider adding multithreaded form of rebalancing
> -
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7132) Consider adding multithreaded form of rebalancing

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7132:
--
Summary: Consider adding multithreaded form of rebalancing  (was: Consider 
adding multithreaded form of recovery)

> Consider adding multithreaded form of rebalancing
> -
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because it restarted at 70, 
> forcing it to reprocess old data. To avoid this from happening, one option 
> would be to allow the current consumer to start processing not from the last 
> checkpointed offset (which is 70 in the example), but from 120 where it 
> recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7132) Consider adding multithreaded form of recovery

2018-07-04 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-7132:
-

 Summary: Consider adding multithreaded form of recovery
 Key: KAFKA-7132
 URL: https://issues.apache.org/jira/browse/KAFKA-7132
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Richard Yu


Currently, when a consumer falls out of a consumer group, it will restart 
processing from the last checkpointed offset. However, this design could result 
in a lag which some users could not afford to let happen. For example, lets say 
a consumer crashed at offset 100, with the last checkpointed offset being at 
70. When it recovers at a later offset (say, 120), it will be behind by an 
offset range of 50 (120 - 70). This is because it restarted at 70, forcing it 
to reprocess old data. To avoid this from happening, one option would be to 
allow the current consumer to start processing not from the last checkpointed 
offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, 
a new KafkaConsumer will be instantiated and start reading from offset 70 in 
concurrency with the old process, and will be terminated once it reaches 120. 
In this manner, a considerable amount of lag can be avoided, particularly since 
the old consumer could proceed as if nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment

2018-07-04 Thread Karsten Schnitter (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532391#comment-16532391
 ] 

Karsten Schnitter commented on KAFKA-7130:
--

I saved the faulty segments from the last incident. But they contain sensitive 
data, that I cannot share outside my company. I tried to read the last message 
of the finished segment from the partition:

{noformat}
$ kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVERS --topic ingestion 
--partition 67 --offset 1316626420 --max-messages 1

[2018-07-04 07:33:11,604] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-79022] Unknown error fetching data for topic-partition 
ingestion-67 (org.apache.kafka.clients.consumer.internals.Fetcher)
...
^C[2018-07-04 07:33:11,606] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-79022] Unknown error fetching data for topic-partition 
ingestion-67 (org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-07-04 07:33:11,612] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-79022] Unknown error fetching data for topic-partition 
ingestion-67 (org.apache.kafka.clients.consumer.internals.Fetcher)
Processed a total of 0 messages
{noformat}
The error message is repeated until I stop the command. It is the same for 
other messages in the segment. Consuming the first message of the new segment 
works fine:

{noformat}
$ kafka-console-consumer.sh --bootstrap-server $KAFKA_SERVERS --topic ingestion 
--partition 67 --offset 1316626421 --max-messages 1


Processed a total of 1 messages
{noformat}

> EOFException after rolling log segment
> --
>
> Key: KAFKA-7130
> URL: https://issues.apache.org/jira/browse/KAFKA-7130
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.0
>Reporter: Karsten Schnitter
>Priority: Major
>
> When rolling a log segment one of our Kafka cluster got an immediate read 
> error on the same partition. This lead to a flood of log messages containing 
> the corresponding stacktraces. Data was still appended to the partition but 
> consumers were unable to read from that partition. Reason for the exception 
> is unclear.
> {noformat}
> [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] 
> Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager)
> [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing 
> fetch operation on partition ingestion-3, offset 971865977 
> (kafka.server.ReplicaManager)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> {noformat}
> We mitigated the issue by stopping the affected node and deleting the 
> corresponding directory. Once the partition was recreated for the replica (we 
> use replication-factor 2) the other replica experienced the same problem. We 
> mitigated likewise.
> To us it is unclear, what caused this issue. Can you help us in finding the 
> root cause of this problem?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment

2018-07-04 Thread Karsten Schnitter (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532366#comment-16532366
 ] 

Karsten Schnitter commented on KAFKA-7130:
--

The stack trace of the original system was lost. Fortunately for this issue, 
but unfortunately for us, the issue reoccured on a different cluster. I 
extracted the stack trace from there:

{noformat}
[2018-07-04 07:10:53,633] ERROR [ReplicaManager broker=4] Error processing 
fetch operation on partition ingestion-67, offset 1316626420 
(kafka.server.ReplicaManager)
org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read 
`log header` from file channel `sun.nio.ch.FileChannelImpl@44f0380f`. Expected 
to read 17 bytes, but reached end of file after reading 0 bytes. Started read 
from position 2147483638.
at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
at 
org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:287)
at kafka.log.LogSegment.translateOffset(LogSegment.scala:190)
at kafka.log.LogSegment.read(LogSegment.scala:242)
at kafka.log.Log.$anonfun$read$2(Log.scala:1020)
at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
at kafka.log.Log.read(Log.scala:976)
at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:920)
at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$6(ReplicaManager.scala:982)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:981)
at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:818)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:823)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:631)
at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: Failed to read `log header` from file channel 
`sun.nio.ch.FileChannelImpl@44f0380f`. Expected to read 17 bytes, but reached 
end of file after reading 0 bytes. Started read from position 2147483638.
at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:806)
at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:66)
at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40)
at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
... 21 more
{noformat}


> EOFException after rolling log segment
> --
>
> Key: KAFKA-7130
> URL: https://issues.apache.org/jira/browse/KAFKA-7130
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.0
>Reporter: Karsten Schnitter
>Priority: Major
>
> When rolling a log segment one of our Kafka cluster got an immediate read 
> error on the same partition. This lead to a flood of log messages containing 
> the corresponding stacktraces. Data was still appended to the partition but 
> consumers were unable to read from that partition. Reason for the exception 
> is unclear.
> {noformat}
> [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] 
> Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager)
> [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing 
> fetch operation on partition ingestion-3, offset 971865977 
> (kafka.server.ReplicaManager)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> {noformat}
> We mitigated the issue by stopping the affected node and deleting the 
> corresponding directory. Once the partition was 

[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532315#comment-16532315
 ] 

ASF GitHub Bot commented on KAFKA-6127:
---

ConcurrencyPractitioner opened a new pull request #5333: [KAFKA-6127] Streams 
should never block infinitely
URL: https://github.com/apache/kafka/pull/5333
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6127) Streams should never block infinitely

2018-07-04 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532301#comment-16532301
 ] 

Richard Yu commented on KAFKA-6127:
---

Hi [~guozhang] Now that we have created new methods (for {{KafkaConsumer}}) 
with bounded times, don't we also require some new configuration (determined by 
the user?) to determine how long we block? This change might require a KIP too. 
With the new {{Consumer}} API, we should also migrate {{KafkaStreams}} as well.

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6127) Streams should never block infinitely

2018-07-04 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-6127:
-

Assignee: Richard Yu

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)