KStreams Rewind Offset
All, I think it's great that the ProcessorContext offers the partition and offset of the current record being processed; however, it offers no way for me to actually use the information. I would like to be able to rewind to a particular offset on a partition if I needed to. The consumer is also not exposed to me so I couldn't access things directly that way either. Is this in the works or would it interfere with rebalancing/auto-commits? Mike
Re: IncompatibleClassChangeError
Thanks. I can't copy/paste stack trace since it's on an internal system. I'll do my best to hand-type the thing in here: java.lang.IncompatiableClassChangeError at scala.collection.immutable.StringLink$class.format(StringLink.scala:318)at scala.collection.imutable.StringOps.format(StringOps.scala:30)at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:450)at kafka.cluster.Partition$$anonfun$9.aplpy(Partition.scala:428)at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala.268)at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)at scala.collection.mutable.HashMap.freachEntry(HashMap.scala:40)at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)at scala.collection.TeraversableLike$class.map(TraversableLike.scala:245)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)at kafka.server.KafkaApis.handle(KafkaApis.scala:68)at kafka.server.KafkaRequestHandler.run(KafkaRequesHandler.scala:60)at java.lang.Thread.run(Thread.java:853) On Friday, April 15, 2016 10:03 AM, Ismael Juma <ism...@juma.me.uk> wrote: Hi Michael, We would need more information to be able to help. We don't serialize scala objects and clients should be able to use different Scala versions than the broker. Do you have a stacktrace of when the exception is thrown? Ismael On Fri, Apr 15, 2016 at 2:39 PM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: > We are seeing odd behavior that we need to understand. We are getting > IncompatibleClassChangeErrors and I know that's related to a Scala version > mismatch. What's not clear, however, is where or why the mismatch is > occurring. We know up front that there were occasions where we ran apps > that had Scala 2.10 dependencies with the 0.9 consumer. We also plugged in > a metrics reporter at the broker with 2.10 scala dependencies. Both of > these situations produced the class change errors and they both appear to > be around the consumer offsets topic. Is there something under the hood > that is serializing a scala object that would cause this issue? The only > fix appears to be blowing away all data in the __consumer_offsets topic and > starting over with 2.11 clients. Is this expected behavior? Seems like a > weakness if so because we have no control, in some cases, what version of > Scala a client might use. > >
IncompatibleClassChangeError
We are seeing odd behavior that we need to understand. We are getting IncompatibleClassChangeErrors and I know that's related to a Scala version mismatch. What's not clear, however, is where or why the mismatch is occurring. We know up front that there were occasions where we ran apps that had Scala 2.10 dependencies with the 0.9 consumer. We also plugged in a metrics reporter at the broker with 2.10 scala dependencies. Both of these situations produced the class change errors and they both appear to be around the consumer offsets topic. Is there something under the hood that is serializing a scala object that would cause this issue? The only fix appears to be blowing away all data in the __consumer_offsets topic and starting over with 2.11 clients. Is this expected behavior? Seems like a weakness if so because we have no control, in some cases, what version of Scala a client might use.
Re: KStream Close Processor
Guozhang, In my processor, I'm buffering up contents of the final messages in order to make them larger. This is to optimize throughput and avoid tiny messages from being injected downstream. So nothing is being pushed to the producer until my configured thresholds are met in the buffering mechanism. So as it stands, these messages are left dangling after the producer closes and, even worse, if periodic commits are happening behind the scenes, the data is lost on restart. What we need is a way to notify the processors that everything is "about" to close so that I can properly flush what I have in memory out to the producer. Otherwise, I'm stuck with always sending tiny messages into kafka--which I know for certain causes problems on down stream consumers (where they set a high fetch memory size and it causes hundreds of thousands of messages to be retrieved at a time…and thus bogs down the consumer). I think the "max.poll.messages" setting we discussed before would help here but if it's not available until 0.10, I'm kind of stuck. Another option might be to disable periodic commits and only commit when the processor requests it. This would mitigate some data loss and is better than nothing. There is still a chance that data in RecordQueue not yet sent to my processor would be committed but never processed in this case. Another thought I had was to reduce the max fetch size; however, some messages can be very large (i.e. data spikes periodically). In this case, the messages size would exceed my lower max fetch size causing the consumer to simply stop consuming. So I'm stuck. So either we need to roll in the max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me know that the producer is about to close so I can clear my buffers. Ideas? Mike On Friday, April 8, 2016 8:24 PM, Guozhang Wang <wangg...@gmail.com> wrote: Hi Michael, When you call KafkaStreams.close(), it will first trigger a commitAll() function, which will 1) flush local state store if necessary; 2) flush messages buffered in producer; 3) commit offsets on consumer. Then it will close the producer / consumer clients and shutdown the tasks. So when you see processor's "close" function triggered, any buffered messages in the producer should already been flushed. Did you see a different behavior than the above described? Guozhang On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: > All, > I'm seeing my processor's "close" method being called AFTER my > downstream producer has been closed. I had assumed that on close I would be > able to flush whatever I had been buffering up to send to kafka topic. In > other words, we've seen significant performance differences in building > flows with small messages and large messages in/out of kafka. So my > processor buffers up messages to a threshold and flushes those as a > composite message bundle to improve downstream processing. But if this > close method is called AFTER the producer has already been closed, I would > have no way to actually flush the final composite bundles to my topic on > shutdown. Is there some way to get a call BEFORE producer shutdown occurs? > Mike > > -- -- Guozhang
KStream Close Processor
All, I'm seeing my processor's "close" method being called AFTER my downstream producer has been closed. I had assumed that on close I would be able to flush whatever I had been buffering up to send to kafka topic. In other words, we've seen significant performance differences in building flows with small messages and large messages in/out of kafka. So my processor buffers up messages to a threshold and flushes those as a composite message bundle to improve downstream processing. But if this close method is called AFTER the producer has already been closed, I would have no way to actually flush the final composite bundles to my topic on shutdown. Is there some way to get a call BEFORE producer shutdown occurs? Mike
Re: KStreams Group Rebalance Commit Error
One more thing I'm noticing in the logs. I see periodic node disconnection messages due to "timeout". I set my metadata.fetch.timeout.ms to 6, request.timeout.ms to 3 and timeout.ms to 3 and those should be more than enough time waiting for metadata responses. I also set my offset commit period to 6 and these disconnected messages seem to overlap with offset commit threshold...meaning it seems to be happening when the offset commit attempts are being made. The "api_key" in the failed request is "1"...I'd have to dig into the code to know what the corresponds to. On Thursday, April 7, 2016 7:35 AM, Michael D. Coon <mdco...@yahoo.com.INVALID> wrote: Guozhang, Thanks for the advice; however, "max.poll.records" doesn't seem to be supported since it's not affecting how many records are coming back from the consumer.poll requests. However, I agree that the likely culprit in rebalancing is the delay in processing new records. I'm going to try and play with the max buffer size per partition setting to see if I can force the consumer to pause, and thus not inject too many records too quickly. It would be awesome if the max.poll.records setting was respected by the consumer/broker and it returned a max number of messages. I feel like this used to be supported in the older Kafka APIs. This setting would allow more tuning of how much data each of my stream job instances receives. Mike On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: Hi Michael, Your issue seems like a more general one with the new Kafka Consumer regarding unexpected rebalances: as for Kafka Streams, it's committing behavior is synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka Consumer, which will fail if there is an ongoing rebalance, since the partitions being committed on may not be owned by the consumer anymore. As for rebalance, there are several cases that can cause it: 1) topic changes, like creation of new topics, partition addition, topic deletes, etc. If you are not changing topics at the moment then you can exclude this case. 2) consumer failures detected by the heart beat protocol, and hence migrating partitions out of the failed consumer. Note that the heartbeat is wrapped in the poll() protocol, so if your consumer thread (and similarly Kafka Streams) takes long time to process polled records while your configured session.timeout.ms value is not large enough. So you can consider 1) increase session.timeout.ms value, 2) set max.poll.records to a reasonably small values to avoid your consumers being falsely considered as failed. More info about the consumer configs: http://kafka.apache.org/documentation.html#newconsumerconfigs Guozhang On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: All, I'm getting CommitFailedExceptions on a small prototype I built using kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several processors chained together with a sink in between a few of them. When I try committing through the ProcessorContext, I see exceptions being thrown about commit failures due to group rebalance (not to mention delays in processing during commit attempts). I'm using a single host, with 2 stream threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be occurring after data starts flowing and I'm committing offsets. This was something I saw with the new Kafka client APIs as well, and I had to work around by creating static partition assignments to my consumers in my data flow...otherwise, any dynamic allocation of new consumers to the group caused this exception to be thrown and I could never commit my offsets. Are you all not seeing this in your tests? Am I not supposed to commit through the ProcessorContext? I'm committing once my interim processor writes its output to the sink point in the flow; is that not the correct/expected behavior/use? Mike -- -- Guozhang
Re: KStreams Group Rebalance Commit Error
Guozhang, Thanks for the advice; however, "max.poll.records" doesn't seem to be supported since it's not affecting how many records are coming back from the consumer.poll requests. However, I agree that the likely culprit in rebalancing is the delay in processing new records. I'm going to try and play with the max buffer size per partition setting to see if I can force the consumer to pause, and thus not inject too many records too quickly. It would be awesome if the max.poll.records setting was respected by the consumer/broker and it returned a max number of messages. I feel like this used to be supported in the older Kafka APIs. This setting would allow more tuning of how much data each of my stream job instances receives. Mike On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: Hi Michael, Your issue seems like a more general one with the new Kafka Consumer regarding unexpected rebalances: as for Kafka Streams, it's committing behavior is synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka Consumer, which will fail if there is an ongoing rebalance, since the partitions being committed on may not be owned by the consumer anymore. As for rebalance, there are several cases that can cause it: 1) topic changes, like creation of new topics, partition addition, topic deletes, etc. If you are not changing topics at the moment then you can exclude this case. 2) consumer failures detected by the heart beat protocol, and hence migrating partitions out of the failed consumer. Note that the heartbeat is wrapped in the poll() protocol, so if your consumer thread (and similarly Kafka Streams) takes long time to process polled records while your configured session.timeout.ms value is not large enough. So you can consider 1) increase session.timeout.ms value, 2) set max.poll.records to a reasonably small values to avoid your consumers being falsely considered as failed. More info about the consumer configs: http://kafka.apache.org/documentation.html#newconsumerconfigs Guozhang On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: All, I'm getting CommitFailedExceptions on a small prototype I built using kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several processors chained together with a sink in between a few of them. When I try committing through the ProcessorContext, I see exceptions being thrown about commit failures due to group rebalance (not to mention delays in processing during commit attempts). I'm using a single host, with 2 stream threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be occurring after data starts flowing and I'm committing offsets. This was something I saw with the new Kafka client APIs as well, and I had to work around by creating static partition assignments to my consumers in my data flow...otherwise, any dynamic allocation of new consumers to the group caused this exception to be thrown and I could never commit my offsets. Are you all not seeing this in your tests? Am I not supposed to commit through the ProcessorContext? I'm committing once my interim processor writes its output to the sink point in the flow; is that not the correct/expected behavior/use? Mike -- -- Guozhang
KStreams Group Rebalance Commit Error
All, I'm getting CommitFailedExceptions on a small prototype I built using kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several processors chained together with a sink in between a few of them. When I try committing through the ProcessorContext, I see exceptions being thrown about commit failures due to group rebalance (not to mention delays in processing during commit attempts). I'm using a single host, with 2 stream threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be occurring after data starts flowing and I'm committing offsets. This was something I saw with the new Kafka client APIs as well, and I had to work around by creating static partition assignments to my consumers in my data flow...otherwise, any dynamic allocation of new consumers to the group caused this exception to be thrown and I could never commit my offsets. Are you all not seeing this in your tests? Am I not supposed to commit through the ProcessorContext? I'm committing once my interim processor writes its output to the sink point in the flow; is that not the correct/expected behavior/use? Mike
KStreams Partition Assignment
I'm evaluating whether the KafkaStreams API will be something we can use on my current project. Namely, we want to be able to distribute the consumers on a Mesos/YARN cluster. It's not entirely clear to me in the code what is deciding which partitions get assigned at runtime and whether this is intended for a distributed application or just a multi-threaded environment. I get that the consumer coordinator will get reassignments when group participation changes; however, in looking through the StreamPartitionAssignor code, it's not clear to me what is happening in the assign method. It looks like to me like subscriptions are coming in from the consumer coordinator, presumably whose assignments are derived from the lead brokers for the topics of interest. Those subscriptions are then translated into co-partitioned groups of clients. Once that's complete, it hands off the co-partitioned groups to the StreamThread's partitionGrouper to do the work of assigning the partitions to each co-partitioned group. The DefaultPartitionGrouper code, starting on line 57, simply does a 1-up assigning of partition to group. How will this actually work with distributed stream consumers if it's always going to be assigning the partition as a 1-up sequence local to that particular consumer? Shouldn't it use the assigned partition that is coming back from the ConsumerCoordinator? I'm struggling to understand the layers but I need to in order to know whether this implementation is going to work for us. If the PartitionGroupAssignor's default is just meant for single-node multithreaded use, that's fine as long as I can inject my own implementation. But I would still need to understand what is happening at the StreamPartitionAssignor layer more clearly. Any info, design docs, in-progress wiki's would be most appreciated if the answer is too in-depth for an email discussion. Thanks and I really love what you guys are doing with this! Mike