KStreams Rewind Offset

2016-06-01 Thread Michael D. Coon
All,
  I think it's great that the ProcessorContext offers the partition and offset 
of the current record being processed; however, it offers no way for me to 
actually use the information. I would like to be able to rewind to a particular 
offset on a partition if I needed to. The consumer is also not exposed to me so 
I couldn't access things directly that way either. Is this in the works or 
would it interfere with rebalancing/auto-commits?
Mike



Re: IncompatibleClassChangeError

2016-04-15 Thread Michael D. Coon
Thanks.
I can't copy/paste stack trace since it's on an internal system.
I'll do my best to hand-type the thing in here:
java.lang.IncompatiableClassChangeError
at scala.collection.immutable.StringLink$class.format(StringLink.scala:318)at 
scala.collection.imutable.StringOps.format(StringOps.scala:30)at 
kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:450)at 
kafka.cluster.Partition$$anonfun$9.aplpy(Partition.scala:428)at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala.268)at 
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)at
 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)at
 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)at
 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)at 
scala.collection.mutable.HashMap.freachEntry(HashMap.scala:40)at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:99)at 
scala.collection.TeraversableLike$class.map(TraversableLike.scala:245)at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)at 
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)at 
kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)at 
kafka.server.KafkaApis.handle(KafkaApis.scala:68)at 
kafka.server.KafkaRequestHandler.run(KafkaRequesHandler.scala:60)at 
java.lang.Thread.run(Thread.java:853)

 

On Friday, April 15, 2016 10:03 AM, Ismael Juma <ism...@juma.me.uk> wrote:
 

 Hi Michael,

We would need more information to be able to help. We don't serialize scala
objects and clients should be able to use different Scala versions than the
broker. Do you have a stacktrace of when the exception is thrown?

Ismael

On Fri, Apr 15, 2016 at 2:39 PM, Michael D. Coon <mdco...@yahoo.com.invalid>
wrote:

> We are seeing odd behavior that we need to understand. We are getting
> IncompatibleClassChangeErrors and I know that's related to a Scala version
> mismatch. What's not clear, however, is where or why the mismatch is
> occurring. We know up front that there were occasions where we ran apps
> that had Scala 2.10 dependencies with the 0.9 consumer. We also plugged in
> a metrics reporter at the broker with 2.10 scala dependencies. Both of
> these situations produced the class change errors and they both appear to
> be around the consumer offsets topic. Is there something under the hood
> that is serializing a scala object that would cause this issue? The only
> fix appears to be blowing away all data in the __consumer_offsets topic and
> starting over with 2.11 clients. Is this expected behavior? Seems like a
> weakness if so because we have no control, in some cases, what version of
> Scala a client might use.
>
>


  

IncompatibleClassChangeError

2016-04-15 Thread Michael D. Coon
We are seeing odd behavior that we need to understand. We are getting 
IncompatibleClassChangeErrors and I know that's related to a Scala version 
mismatch. What's not clear, however, is where or why the mismatch is occurring. 
We know up front that there were occasions where we ran apps that had Scala 
2.10 dependencies with the 0.9 consumer. We also plugged in a metrics reporter 
at the broker with 2.10 scala dependencies. Both of these situations produced 
the class change errors and they both appear to be around the consumer offsets 
topic. Is there something under the hood that is serializing a scala object 
that would cause this issue? The only fix appears to be blowing away all data 
in the __consumer_offsets topic and starting over with 2.11 clients. Is this 
expected behavior? Seems like a weakness if so because we have no control, in 
some cases, what version of Scala a client might use.



Re: KStream Close Processor

2016-04-09 Thread Michael D. Coon
Guozhang,
   In my processor, I'm buffering up contents of the final messages in order to 
make them larger. This is to optimize throughput and avoid tiny messages from 
being injected downstream. So nothing is being pushed to the producer until my 
configured thresholds are met in the buffering mechanism. So as it stands, 
these messages are left dangling after the producer closes and, even worse, if 
periodic commits are happening behind the scenes, the data is lost on restart.
   What we need is a way to notify the processors that everything is "about" to 
close so that I can properly flush what I have in memory out to the producer. 
Otherwise, I'm stuck with always sending tiny messages into kafka--which I know 
for certain causes problems on down stream consumers (where they set a high 
fetch memory size and it causes hundreds of thousands of messages to be 
retrieved at a time…and thus bogs down the consumer). I think the 
"max.poll.messages" setting we discussed before would help here but if it's not 
available until 0.10, I'm kind of stuck.
    Another option might be to disable periodic commits and only commit when 
the processor requests it. This would mitigate some data loss and is better 
than nothing. There is still a chance that data in RecordQueue not yet sent to 
my processor would be committed but never processed in this case.
    Another thought I had was to reduce the max fetch size; however, some 
messages can be very large (i.e. data spikes periodically). In this case, the 
messages size would exceed my lower max fetch size causing the consumer to 
simply stop consuming. So I'm stuck. So either we need to roll in the 
max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me 
know that the producer is about to close so I can clear my buffers. 
    Ideas?
Mike

On Friday, April 8, 2016 8:24 PM, Guozhang Wang <wangg...@gmail.com> wrote:
 

 Hi Michael,

When you call KafkaStreams.close(), it will first trigger a commitAll()
function, which will 1) flush local state store if necessary; 2) flush
messages buffered in producer; 3) commit offsets on consumer. Then it will
close the producer / consumer clients and shutdown the tasks. So when you
see processor's "close" function triggered, any buffered messages in the
producer should already been flushed.

Did you see a different behavior than the above described?

Guozhang


On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon <mdco...@yahoo.com.invalid>
wrote:

> All,
>    I'm seeing my processor's "close" method being called AFTER my
> downstream producer has been closed. I had assumed that on close I would be
> able to flush whatever I had been buffering up to send to kafka topic. In
> other words, we've seen significant performance differences in building
> flows with small messages and large messages in/out of kafka. So my
> processor buffers up messages to a threshold and flushes those as a
> composite message bundle to improve downstream processing. But if this
> close method is called AFTER the producer has already been closed, I would
> have no way to actually flush the final composite bundles to my topic on
> shutdown. Is there some way to get a call BEFORE producer shutdown occurs?
> Mike
>
>


-- 
-- Guozhang


  

KStream Close Processor

2016-04-08 Thread Michael D. Coon
All,
   I'm seeing my processor's "close" method being called AFTER my downstream 
producer has been closed. I had assumed that on close I would be able to flush 
whatever I had been buffering up to send to kafka topic. In other words, we've 
seen significant performance differences in building flows with small messages 
and large messages in/out of kafka. So my processor buffers up messages to a 
threshold and flushes those as a composite message bundle to improve downstream 
processing. But if this close method is called AFTER the producer has already 
been closed, I would have no way to actually flush the final composite bundles 
to my topic on shutdown. Is there some way to get a call BEFORE producer 
shutdown occurs?
Mike



Re: KStreams Group Rebalance Commit Error

2016-04-07 Thread Michael D. Coon
One more thing I'm noticing in the logs.
I see periodic node disconnection messages due to "timeout". I set my 
metadata.fetch.timeout.ms to 6, request.timeout.ms to 3 and timeout.ms 
to 3 and those should be more than enough time waiting for metadata 
responses. I also set my offset commit period to 6 and these disconnected 
messages seem to overlap with offset commit threshold...meaning it seems to be 
happening when the offset commit attempts are being made. The "api_key" in the 
failed request is "1"...I'd have to dig into the code to know what the 
corresponds to.

 

On Thursday, April 7, 2016 7:35 AM, Michael D. Coon 
<mdco...@yahoo.com.INVALID> wrote:
 

 Guozhang,
   Thanks for the advice; however, "max.poll.records" doesn't seem to be 
supported since it's not affecting how many records are coming back from the 
consumer.poll requests. However, I agree that the likely culprit in rebalancing 
is the delay in processing new records. I'm going to try and play with the max 
buffer size per partition setting to see if I can force the consumer to pause, 
and thus not inject too many records too quickly. It would be awesome if the 
max.poll.records setting was respected by the consumer/broker and it returned a 
max number of messages. I feel like this used to be supported in the older 
Kafka APIs. This setting would allow more tuning of how much data each of my 
stream job instances receives.

Mike
 

    On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> 
wrote:
 

 Hi Michael,
Your issue seems like a more general one with the new Kafka Consumer regarding 
unexpected rebalances: as for Kafka Streams, it's committing behavior is 
synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka 
Consumer, which will fail if there is an ongoing rebalance, since the 
partitions being committed on may not be owned by the consumer anymore.
As for rebalance, there are several cases that can cause it:
1) topic changes, like creation of new topics, partition addition, topic 
deletes, etc. 
If you are not changing topics at the moment then you can exclude this case.
2) consumer failures detected by the heart beat protocol, and hence migrating 
partitions out of the failed consumer.
Note that the heartbeat is wrapped in the poll() protocol, so if your consumer 
thread (and similarly Kafka Streams) takes long time to process polled records 
while your configured session.timeout.ms value is not large enough.
So you can consider 1) increase session.timeout.ms value, 2) set 
max.poll.records to a reasonably small values to avoid your consumers being 
falsely considered as failed.
More info about the consumer configs:
http://kafka.apache.org/documentation.html#newconsumerconfigs


Guozhang

On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> 
wrote:

All,
   I'm getting CommitFailedExceptions on a small prototype I built using 
kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several 
processors chained together with a sink in between a few of them. When I try 
committing through the ProcessorContext, I see exceptions being thrown about 
commit failures due to group rebalance (not to mention delays in processing 
during commit attempts). I'm using a single host, with 2 stream threads and a 
5-node Kafka cluster. I wouldn't think rebalancing would be occurring after 
data starts flowing and I'm committing offsets. This was something I saw with 
the new Kafka client APIs as well, and I had to work around by creating static 
partition assignments to my consumers in my data flow...otherwise, any dynamic 
allocation of new consumers to the group caused this exception to be thrown and 
I could never commit my offsets. Are you all not seeing this in your tests? Am 
I not supposed to commit through the ProcessorContext? I'm committing once my 
interim processor writes its output to the sink point in the flow; is that not 
the correct/expected behavior/use?
Mike





-- 
-- Guozhang




  

Re: KStreams Group Rebalance Commit Error

2016-04-07 Thread Michael D. Coon
Guozhang,
   Thanks for the advice; however, "max.poll.records" doesn't seem to be 
supported since it's not affecting how many records are coming back from the 
consumer.poll requests. However, I agree that the likely culprit in rebalancing 
is the delay in processing new records. I'm going to try and play with the max 
buffer size per partition setting to see if I can force the consumer to pause, 
and thus not inject too many records too quickly. It would be awesome if the 
max.poll.records setting was respected by the consumer/broker and it returned a 
max number of messages. I feel like this used to be supported in the older 
Kafka APIs. This setting would allow more tuning of how much data each of my 
stream job instances receives.

Mike
 

On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> 
wrote:
 

 Hi Michael,
Your issue seems like a more general one with the new Kafka Consumer regarding 
unexpected rebalances: as for Kafka Streams, it's committing behavior is 
synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka 
Consumer, which will fail if there is an ongoing rebalance, since the 
partitions being committed on may not be owned by the consumer anymore.
As for rebalance, there are several cases that can cause it:
1) topic changes, like creation of new topics, partition addition, topic 
deletes, etc. 
If you are not changing topics at the moment then you can exclude this case.
2) consumer failures detected by the heart beat protocol, and hence migrating 
partitions out of the failed consumer.
Note that the heartbeat is wrapped in the poll() protocol, so if your consumer 
thread (and similarly Kafka Streams) takes long time to process polled records 
while your configured session.timeout.ms value is not large enough.
So you can consider 1) increase session.timeout.ms value, 2) set 
max.poll.records to a reasonably small values to avoid your consumers being 
falsely considered as failed.
More info about the consumer configs:
http://kafka.apache.org/documentation.html#newconsumerconfigs


Guozhang

On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> 
wrote:

All,
   I'm getting CommitFailedExceptions on a small prototype I built using 
kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several 
processors chained together with a sink in between a few of them. When I try 
committing through the ProcessorContext, I see exceptions being thrown about 
commit failures due to group rebalance (not to mention delays in processing 
during commit attempts). I'm using a single host, with 2 stream threads and a 
5-node Kafka cluster. I wouldn't think rebalancing would be occurring after 
data starts flowing and I'm committing offsets. This was something I saw with 
the new Kafka client APIs as well, and I had to work around by creating static 
partition assignments to my consumers in my data flow...otherwise, any dynamic 
allocation of new consumers to the group caused this exception to be thrown and 
I could never commit my offsets. Are you all not seeing this in your tests? Am 
I not supposed to commit through the ProcessorContext? I'm committing once my 
interim processor writes its output to the sink point in the flow; is that not 
the correct/expected behavior/use?
Mike





-- 
-- Guozhang


  

KStreams Group Rebalance Commit Error

2016-04-06 Thread Michael D. Coon
All,
   I'm getting CommitFailedExceptions on a small prototype I built using 
kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several 
processors chained together with a sink in between a few of them. When I try 
committing through the ProcessorContext, I see exceptions being thrown about 
commit failures due to group rebalance (not to mention delays in processing 
during commit attempts). I'm using a single host, with 2 stream threads and a 
5-node Kafka cluster. I wouldn't think rebalancing would be occurring after 
data starts flowing and I'm committing offsets. This was something I saw with 
the new Kafka client APIs as well, and I had to work around by creating static 
partition assignments to my consumers in my data flow...otherwise, any dynamic 
allocation of new consumers to the group caused this exception to be thrown and 
I could never commit my offsets. Are you all not seeing this in your tests? Am 
I not supposed to commit through the ProcessorContext? I'm committing once my 
interim processor writes its output to the sink point in the flow; is that not 
the correct/expected behavior/use?
Mike



KStreams Partition Assignment

2016-03-20 Thread Michael D. Coon
I'm evaluating whether the KafkaStreams API will be something we can use on my 
current project. Namely, we want to be able to distribute the consumers on a 
Mesos/YARN cluster. It's not entirely clear to me in the code what is deciding 
which partitions get assigned at runtime and whether this is intended for a 
distributed application or just a multi-threaded environment. 
I get that the consumer coordinator will get reassignments when group 
participation changes; however, in looking through the StreamPartitionAssignor 
code, it's not clear to me what is happening in the assign method. It looks 
like to me like subscriptions are coming in from the consumer coordinator, 
presumably whose assignments are derived from the lead brokers for the topics 
of interest. Those subscriptions are then translated into co-partitioned groups 
of clients. Once that's complete, it hands off the co-partitioned groups to the 
StreamThread's partitionGrouper to do the work of assigning the partitions to 
each co-partitioned group. The DefaultPartitionGrouper code, starting on line 
57, simply does a 1-up assigning of partition to group. How will this actually 
work with distributed stream consumers if it's always going to be assigning the 
partition as a 1-up sequence local to that particular consumer? Shouldn't it 
use the assigned partition that is coming back from the ConsumerCoordinator? 
I'm struggling to understand the layers but I need to in order to know whether 
this implementation is going to work for us. If the PartitionGroupAssignor's 
default is just meant for single-node multithreaded use, that's fine as long as 
I can inject my own implementation. But I would still need to understand what 
is happening at the StreamPartitionAssignor layer more clearly. Any info, 
design docs, in-progress wiki's would be most appreciated if the answer is too 
in-depth for an email discussion. Thanks and I really love what you guys are 
doing with this!
Mike