[jira] [Created] (KAFKA-5571) Possible deadlock during shutdown in setState in kafka streams 10.2

2017-07-07 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-5571:
-

 Summary: Possible deadlock during shutdown in setState in kafka 
streams 10.2
 Key: KAFKA-5571
 URL: https://issues.apache.org/jira/browse/KAFKA-5571
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.1
Reporter: Greg Fodor
 Attachments: kafka-streams.deadlock.log

I'm running a 10.2 job across 5 nodes with 32 stream threads on each node and 
find that when gracefully shutdown all of them at once via an ansible scripts, 
some of the nodes end up freezing -- at a glance the attached thread dump 
implies a deadlock between stream threads trying to update their state via 
setState. We haven't had this problem before but it may or may not be related 
to changes in 10.2 (we are upgrading from 10.0 to 10.2)

when we gracefully shutdown all nodes simultaneously, what typically happens is 
some subset of the nodes end up not shutting down completely but end up going 
through a rebalance first. it seems this deadlock requires this rebalancing to 
occur simultaneously with the graceful shutdown. if we happen to shut them down 
and no rebalance happens, i don't believe this deadlock is triggered.

the deadlock appears related to the state change handlers being subscribed 
across threads and the fact that both StreamThread#setState and 
StreamStateListener#onChange are both synchronized methods.

Another thing worth mentioning is that one of the transformers used in the job 
has a close() method that can take 10-15 seconds to finish since it needs to 
flush some data to a database. Having a long close() method combined with a 
rebalance during a shutdown across many threads may be necessary for 
reproduction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2016-11-01 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627097#comment-15627097
 ] 

Greg Fodor commented on KAFKA-4113:
---

Hey [~guozhang], I have been able to reproduce a bootstrapping issue on a fresh 
local node, and I think there might be some stuff I either need clarity on or 
may even be a bug.

The root cause seems to be here:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L137

For a completely new node/topology with a KTable topic with existing state, 
there is no consumer metadata, so this initializes the offset limit to 0, which 
results in the state restoration loop to basically not consume any records. 
I've only reproduced this in a local case where I was sinking data to a KTable 
topic and then initialized the topology for the first time, which is a one-time 
event, but I'm wondering if this offset limit default of zero could be causing 
issues later in the lifecycle of the topology as well.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2016-10-20 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593018#comment-15593018
 ] 

Greg Fodor commented on KAFKA-4113:
---

Oh, so it should be doing exactly what makes sense to me -- I am on 0.10.0. Let 
me verify that there isn't something else going on! Thanks for the info.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4317) RocksDB checkpoint files lost on kill -9

2016-10-19 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-4317:
-

 Summary: RocksDB checkpoint files lost on kill -9
 Key: KAFKA-4317
 URL: https://issues.apache.org/jira/browse/KAFKA-4317
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Greg Fodor
Assignee: Guozhang Wang


Right now, the checkpoint files for logged RocksDB stores are written during a 
graceful shutdown, and removed upon restoration. Unfortunately this means that 
in a scenario where the process is forcibly killed, the checkpoint files are 
not there, so all RocksDB stores are rematerialized from scratch on the next 
launch.

In a way, this is good, because it simulates bootstrapping a new node (for 
example, its a good way to see how much I/O is used to rematerialize the 
stores) however it leads to longer recovery times when a non-graceful shutdown 
occurs and we want to get the job up and running again.

It seems that two possible things to consider:

- Simply do not remove checkpoint files on restoring. This way a kill -9 will 
result in only repeating the restoration of all the data generated in the 
source topics since the last graceful shutdown.

- Continually update the checkpoint files (perhaps on commit) -- this would 
result in the least amount of overhead/latency in restarting, but the 
additional complexity may not be worth it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2016-10-19 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587991#comment-15587991
 ] 

Greg Fodor commented on KAFKA-4113:
---

I guess what I would argue is that KStreamBuilder#table should have identical 
semantics to a logged state store backed KTable, except you are specifying the 
topic and (obv) it's not mutable from the job's POV. It should first check if 
it has a local, checkpointed rocksdb, and if so, it should just read from the 
checkpoint forward. If not, it should rematerialize from offset 0 and block the 
start of the job until it does. On shutdown, it should write the checkpoint 
file. It seems to me that this might boil down to just having it be "use this 
topic for the logged state store backing this KTableImpl."

I'm sure there are cases I'm missing, but having that be the behavior for 
KStreamBuilder#table would effectively solve all of our problems as far as I 
can tell. The semantics + I/O impact of this approach back out to the same 
exact ones you have when you use a normal user-created persistent state store, 
but just are managing the topic writes yourself.

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2016-10-19 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587966#comment-15587966
 ] 

Greg Fodor commented on KAFKA-4113:
---

Having played around with Kafka Streams for a while now, I am still confused 
(and we still get burned) by this. Let me walk through a case, and see if you 
guys can find out where I am misunderstanding.

Say we have a topic that's a user table changelog that has user id keys and 
user records. And we have a clickstream topic that is just a user id to url. 
For the sake of this example, lets assume our kafka streams job has been 
running from t = 0 where both topics were empty, so there's no bootstrapping 
problem.

In the Kafka Streams DSL, I would tap the user table topic via 
`KStreamBuilder#table`. As best I can tell, this creates a KTable with:
- An unlogged rocksdb state store (which is going to land on disk)
- A new source that is the table topic

After this, I'm going to tap + inner join the click stream as a KStream on user 
id, and just for this example lets assume I'll sink it all out too to a new 
topic.

As my node is humming along, it is writing the user id -> user record k/vs to 
the local rocksdb but is *not* storing the changes to the rocksdb in a topic, 
because it is not marked as logged. When it reads a record from the KStream, 
the join is performed by looking for the key in the state store. As mentioned, 
my understanding is that the join against the stream will wait until the 
records for the KTable which have earlier timestamps have been consumed. This 
makes sense.

If I terminate and restart the java process, the kafka consumer for the KTable 
will pick off at the last committed offset for the user table topic. It may 
re-consume a few seconds worth of records, and re-write a few keys in the rocks 
db store, but after that it's still got the full historical state of the topic. 
So joins against any user id will continue to work.

Where things completely stop making sense for me is if I lose the node. If I 
lose the node, i lose my rocksdb, which is not logged so is not backed by a 
changelog topic. When I bring up a new node, my understanding is that the 
consumer will *not* start at the beginning of the topic used for the KTable, it 
will just pick up at the last commit. So what I end up with is a rocksdb that 
only contains the last couple of records from the user table topic. This is 
obviously really broken, because now my joins will start failling. (And it 
seems I was lulled into complaency here since I was robust across JVM restarts, 
but not across node failures.) I believe this problem also happens in a more 
nefarious way upon rebalances, since if a partition of the KTable gets 
reassigned, it will also have a partially complete rocksdb store for that 
partition since it will just consume from the last committed offset. Similarly, 
and even scarier, if it gets assigned back to the original node, that node now 
has a rocksdb store with a very small gap, for the key changes that happened 
during the period where it was assigned to another node.

I am not sure if I am missing something here but this has been the behavior we 
have seen. The workarounds we have done for this problem are:
- write a routine to let us reset the KTable topics consumer offsets to zero 
(still doesn't help with a rebalance)
- perform a "touch" to the database records we are flushing to kafka, so new 
copies of all of the records are appended to the topic via kafka connect, and 
are forced into the rocksdb stores (this works well, but obviously is terrible)
- put a dummy aggregation/reduce after the tap of the KTable topic, which 
forces things into a logged state store that will be fully materialized on 
startup if it is missing

Thoughts?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no 

[jira] [Commented] (KAFKA-3545) Generalized Serdes for List/Map

2016-10-17 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583149#comment-15583149
 ] 

Greg Fodor commented on KAFKA-3545:
---

Yes

> Generalized Serdes for List/Map
> ---
>
> Key: KAFKA-3545
> URL: https://issues.apache.org/jira/browse/KAFKA-3545
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
> Fix For: 0.10.1.0
>
>
> In working with Kafka Streams I've found it's often the case I want to 
> perform a "group by" operation, where I repartition a stream based on a 
> foreign key and then do an aggregation of all the values into a single 
> collection, so the stream becomes one where each entry has a value that is a 
> serialized list of values that belonged to the key. (This seems unrelated to 
> the 'group by' operation talked about in KAFKA-3544.) Basically the same 
> typical group by operation found in systems like Cascading.
> In order to create these intermediate list values I needed to define custom 
> avro schemas that simply wrap the elements of interest into a list. It seems 
> desirable that there be some basic facility for constructing simple Serdes of 
> Lists/Maps/Sets of other types, potentially using avro's serialization under 
> the hood. If this existed in the core library it would also enable the 
> addition of higher level operations on streams that can use these Serdes to 
> perform simple operations like the "group by" example I mention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3545) Generalized Serdes for List/Map

2016-10-17 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor resolved KAFKA-3545.
---
   Resolution: Fixed
Fix Version/s: 0.10.1.0

> Generalized Serdes for List/Map
> ---
>
> Key: KAFKA-3545
> URL: https://issues.apache.org/jira/browse/KAFKA-3545
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
> Fix For: 0.10.1.0
>
>
> In working with Kafka Streams I've found it's often the case I want to 
> perform a "group by" operation, where I repartition a stream based on a 
> foreign key and then do an aggregation of all the values into a single 
> collection, so the stream becomes one where each entry has a value that is a 
> serialized list of values that belonged to the key. (This seems unrelated to 
> the 'group by' operation talked about in KAFKA-3544.) Basically the same 
> typical group by operation found in systems like Cascading.
> In order to create these intermediate list values I needed to define custom 
> avro schemas that simply wrap the elements of interest into a list. It seems 
> desirable that there be some basic facility for constructing simple Serdes of 
> Lists/Maps/Sets of other types, potentially using avro's serialization under 
> the hood. If this existed in the core library it would also enable the 
> addition of higher level operations on streams that can use these Serdes to 
> perform simple operations like the "group by" example I mention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-10-09 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560765#comment-15560765
 ] 

Greg Fodor commented on KAFKA-4281:
---

PR: https://github.com/apache/kafka/pull/1998

If this approach seems sane, please take a look especially at the window 
variants -- I am not too familiar with those APIs.

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-10-09 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-4281:
-

 Summary: Should be able to forward aggregation values immediately
 Key: KAFKA-4281
 URL: https://issues.apache.org/jira/browse/KAFKA-4281
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Greg Fodor
Assignee: Guozhang Wang


KIP-63 introduced changes to the behavior of aggregations such that the result 
of aggregations will not appear to subsequent processors until a state store 
flush occurs. This is problematic for latency sensitive aggregations since 
flushes occur generally at commit.interval.ms, which is usually a few seconds. 
Combined with several aggregations, this can result in several seconds of 
latency through a topology for steps dependent upon aggregations.

Two potential solutions:
- Allow finer control over the state store flushing intervals
- Allow users to change the behavior so that certain aggregations will 
immediately forward records to the next step (as was the case pre-KIP-63)

A PR is attached that takes the second approach. To add this unfortunately a 
large number of files needed to be touched, and this effectively doubles the 
number of method signatures around grouping on KTable and KStream. I tried an 
alternative approach that let the user opt-in to immediate forwarding via an 
additional builder method on KGroupedStream/Table but this didn't work as 
expected because in order for the latency to go away, the KTableImpl itself 
must also mark its source as forward immediate (otherwise we will still see 
latency due to the materialization of the KTableSource still relying upon state 
store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4043) User-defined handler for topology restart

2016-09-07 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor resolved KAFKA-4043.
---
Resolution: Not A Problem

> User-defined handler for topology restart
> -
>
> Key: KAFKA-4043
> URL: https://issues.apache.org/jira/browse/KAFKA-4043
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Since Kafka Streams is just a library, there's a lot of cool stuff we've been 
> able to do that would be trickier if it were part of a larger 
> cluster-oriented job execution system that had assumptions about the 
> semantics of a job. One of the jobs we have uses Kafka Streams to do top 
> level data flow, and then one of our processors actually will kick off 
> background threads to do work based upon the data flow state. Happy to fill 
> in more details of our use-case, but fundamentally the model is that we have 
> a Kafka Streams data flow that is reading state from upstream, and that state 
> dictates that work needs to be done, which results in a dedicated work thread 
> to be spawned by our job.
> This works great, but we're running into an issue when there is partition 
> reassignment, since we have no way to detect this and cleanly shut down these 
> threads. In our case, we'd like to shut down the background worker threads if 
> there is a partition rebalance or if the job raises an exception and attempts 
> to restart. In practice what is happening is we are getting duplicate threads 
> for the same work on a partition rebalance.
> Implementation-wise, this seems like some type of event handler that can be 
> attached to the topology at build time that can will be called when the data 
> flow needs to rebalance or rebuild its task threads in general (ideally 
> passing as much information about the reason along.) I could imagine this 
> being factored similarly to the KafkaStreams#setUncaughtExceptionHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4043) User-defined handler for topology restart

2016-09-07 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472271#comment-15472271
 ] 

Greg Fodor commented on KAFKA-4043:
---

Ah, that should work for us. Thanks!

> User-defined handler for topology restart
> -
>
> Key: KAFKA-4043
> URL: https://issues.apache.org/jira/browse/KAFKA-4043
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Since Kafka Streams is just a library, there's a lot of cool stuff we've been 
> able to do that would be trickier if it were part of a larger 
> cluster-oriented job execution system that had assumptions about the 
> semantics of a job. One of the jobs we have uses Kafka Streams to do top 
> level data flow, and then one of our processors actually will kick off 
> background threads to do work based upon the data flow state. Happy to fill 
> in more details of our use-case, but fundamentally the model is that we have 
> a Kafka Streams data flow that is reading state from upstream, and that state 
> dictates that work needs to be done, which results in a dedicated work thread 
> to be spawned by our job.
> This works great, but we're running into an issue when there is partition 
> reassignment, since we have no way to detect this and cleanly shut down these 
> threads. In our case, we'd like to shut down the background worker threads if 
> there is a partition rebalance or if the job raises an exception and attempts 
> to restart. In practice what is happening is we are getting duplicate threads 
> for the same work on a partition rebalance.
> Implementation-wise, this seems like some type of event handler that can be 
> attached to the topology at build time that can will be called when the data 
> flow needs to rebalance or rebuild its task threads in general (ideally 
> passing as much information about the reason along.) I could imagine this 
> being factored similarly to the KafkaStreams#setUncaughtExceptionHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-09-06 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468374#comment-15468374
 ] 

Greg Fodor commented on KAFKA-3769:
---

It's just the sensor calls inside of Selector, not Kafka Streams
specific. I'll verify as much as I can from the profiler snapshot that
it's the same issue and will open a jira.



> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-05 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464171#comment-15464171
 ] 

Greg Fodor edited comment on KAFKA-4120 at 9/5/16 5:34 PM:
---

We were able to work around it by just creating a proper Avro class for the 
byte data, but I think it would probably be helpful to future users if there 
were a way to prevent this from happening -- an exception doesn't seem 
unreasonable.


was (Author: gfodor):
We were able to work around it by just creating a proper Avro class for the 
byte data, but I think it would probably be helpful to future if there were a 
way to prevent this from happening -- an exception doesn't seem unreasonable.

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-04 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464171#comment-15464171
 ] 

Greg Fodor commented on KAFKA-4120:
---

We were able to work around it by just creating a proper Avro class for the 
byte data, but I think it would probably be helpful to future if there were a 
way to prevent this from happening -- an exception doesn't seem unreasonable.

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-09-04 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463861#comment-15463861
 ] 

Greg Fodor commented on KAFKA-3769:
---

I've done some additional profiling and I have found that this problem also 
seems to crop up in complex kafka streams jobs within the Kafka core Selector 
class. Should I open another JIRA?

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-03 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-4120:
-

 Summary: byte[] keys in RocksDB state stores do not work as 
expected
 Key: KAFKA-4120
 URL: https://issues.apache.org/jira/browse/KAFKA-4120
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Greg Fodor
Assignee: Guozhang Wang


We ran into an issue using a byte[] key in a RocksDB state store (with the byte 
array serde.) Internally, the RocksDB store keeps a LRUCache that is backed by 
a LinkedHashMap that sits between the callers and the actual db. The problem is 
that while the underlying rocks db will persist byte arrays with equal data as 
equivalent keys, the LinkedHashMap uses byte[] reference equality from 
Object.equals/hashcode. So, this can result in multiple entries in the cache 
for two different byte arrays that have the same contents and are backed by the 
same key in the db, resulting in unexpected behavior. 

One such behavior that manifests from this is if you store a value in the state 
store with a specific key, if you re-read that key with the same byte array you 
will get the new value, but if you re-read that key with a different byte array 
with the same bytes, you will get a stale value until the db is flushed. (This 
made it particularly tricky to track down what was happening :))

The workaround for us is to convert the keys from raw byte arrays to a 
deserialized avro structure that provides proper hashcode/equals semantics for 
the intermediate cache. In general this seems like good practice, so one of the 
proposed solutions is to simply emit a warning or exception if a key type with 
breaking semantics like this is provided.

A few proposed solutions:

- When the state store is defined on array keys, ensure that the cache map does 
proper comparisons on array values not array references. This would fix this 
problem, but seems a bit strange to special case. However, I have a hard time 
of thinking of other examples where this behavior would burn users.

- Change the LRU cache to deserialize and serialize all keys to bytes and use a 
value based comparison for the map. This would be the most correct, as it would 
ensure that both the rocks db and the cache have identical key spaces and 
equality/hashing semantics. However, this is probably slow, and since the 
general case of using avro record types as keys works fine, it will largely be 
unnecessary overhead.

- Don't change anything about the behavior, but trigger a warning in the log or 
fail to start if a state store is defined on array keys (or possibly any key 
type that fails to properly override Object.equals/hashcode.)





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4043) User-defined handler for topology restart

2016-08-28 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443912#comment-15443912
 ] 

Greg Fodor commented on KAFKA-4043:
---

I suppose it looks like there's also not a handler for when the topology shuts 
down normally either. This would be good to have too.

> User-defined handler for topology restart
> -
>
> Key: KAFKA-4043
> URL: https://issues.apache.org/jira/browse/KAFKA-4043
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Since Kafka Streams is just a library, there's a lot of cool stuff we've been 
> able to do that would be trickier if it were part of a larger 
> cluster-oriented job execution system that had assumptions about the 
> semantics of a job. One of the jobs we have uses Kafka Streams to do top 
> level data flow, and then one of our processors actually will kick off 
> background threads to do work based upon the data flow state. Happy to fill 
> in more details of our use-case, but fundamentally the model is that we have 
> a Kafka Streams data flow that is reading state from upstream, and that state 
> dictates that work needs to be done, which results in a dedicated work thread 
> to be spawned by our job.
> This works great, but we're running into an issue when there is partition 
> reassignment, since we have no way to detect this and cleanly shut down these 
> threads. In our case, we'd like to shut down the background worker threads if 
> there is a partition rebalance or if the job raises an exception and attempts 
> to restart. In practice what is happening is we are getting duplicate threads 
> for the same work on a partition rebalance.
> Implementation-wise, this seems like some type of event handler that can be 
> attached to the topology at build time that can will be called when the data 
> flow needs to rebalance or rebuild its task threads in general (ideally 
> passing as much information about the reason along.) I could imagine this 
> being factored similarly to the KafkaStreams#setUncaughtExceptionHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-08-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433738#comment-15433738
 ] 

Greg Fodor commented on KAFKA-3758:
---

Excited to try this, as our state stores have grown, it's become more
and more difficult to get jobs to start up without timeouts.



> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731  

[jira] [Created] (KAFKA-4043) User-defined handler for topology restart

2016-08-15 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-4043:
-

 Summary: User-defined handler for topology restart
 Key: KAFKA-4043
 URL: https://issues.apache.org/jira/browse/KAFKA-4043
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Greg Fodor
Assignee: Guozhang Wang


Since Kafka Streams is just a library, there's a lot of cool stuff we've been 
able to do that would be trickier if it were part of a larger cluster-oriented 
job execution system that had assumptions about the semantics of a job. One of 
the jobs we have uses Kafka Streams to do top level data flow, and then one of 
our processors actually will kick off background threads to do work based upon 
the data flow state. Happy to fill in more details of our use-case, but 
fundamentally the model is that we have a Kafka Streams data flow that is 
reading state from upstream, and that state dictates that work needs to be 
done, which results in a dedicated work thread to be spawned by our job.

This works great, but we're running into an issue when there is partition 
reassignment, since we have no way to detect this and cleanly shut down these 
threads. In our case, we'd like to shut down the background worker threads if 
there is a partition rebalance or if the job raises an exception and attempts 
to restart. In practice what is happening is we are getting duplicate threads 
for the same work on a partition rebalance.

Implementation-wise, this seems like some type of event handler that can be 
attached to the topology at build time that can will be called when the data 
flow needs to rebalance or rebuild its task threads in general (ideally passing 
as much information about the reason along.) I could imagine this being 
factored similarly to the KafkaStreams#setUncaughtExceptionHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

2016-08-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416351#comment-15416351
 ] 

Greg Fodor commented on KAFKA-3752:
---

Oh apologies for mis-reading the ticket, but in our case it's a recoverable 
condition.

> Provide a way for KStreams to recover from unclean shutdown
> ---
>
> Key: KAFKA-3752
> URL: https://issues.apache.org/jira/browse/KAFKA-3752
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>  Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>   at 
> 

[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

2016-08-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415664#comment-15415664
 ] 

Greg Fodor commented on KAFKA-3752:
---

Hey [~guozhang], we're hitting this issue as well during a rebalance and I 
think also during failures during startup due to KAFKA-3559. The job eventually 
recovers. We are set to running 32 threads per instance and have 2 instances 
(though the issue happens more often when we run on a single instance.) Log 
here: https://gist.github.com/gfodor/bac65bff38233193b70836b78c701e7b 

> Provide a way for KStreams to recover from unclean shutdown
> ---
>
> Key: KAFKA-3752
> URL: https://issues.apache.org/jira/browse/KAFKA-3752
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>  Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>   at 
> 

[jira] [Resolved] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-08-07 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor resolved KAFKA-3770.
---
Resolution: Fixed

Fixed via KAFKA-3786

> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-06-12 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326778#comment-15326778
 ] 

Greg Fodor commented on KAFKA-3769:
---

Discussion/resolution moved to: https://issues.apache.org/jira/browse/KAFKA-3811

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: screenshot-latency.png

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324869#comment-15324869
 ] 

Greg Fodor commented on KAFKA-3811:
---

Also, I've attached a screenshot + snapshot of a second run where I started 
sending data deeper in the pipeline which started to cause the latency metrics 
to take up a few % of time since we're using state stores. To me I guess a lot 
of this looks like lock contention mostly.



> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: Muon-latency.zip

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324847#comment-15324847
 ] 

Greg Fodor commented on KAFKA-3811:
---

I've also attached a screenshot of YourKit of the relevant call stacks

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, screenshot-1.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: screenshot-1.png

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip, screenshot-1.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Attachment: Muon-Snapshot.zip

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
> Attachments: Muon-Snapshot.zip
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-10 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324838#comment-15324838
 ] 

Greg Fodor commented on KAFKA-3811:
---

Hey [~aartigupta], I ran an attached yourkit profiler to one of our jobs 
running dark against production data. The job has 200-300 topic-partition pairs 
and generally discards most messages early in the pipeline, and was processing 
a few thousand tps from the top level topics. Unfortunately since this issue 
came up we implemented changes to reduce the amount of data running through the 
system (discarding it earlier) so we didn't have to worry about this 
performance problem. In my tests a majority of the CPU time of the job was 
spent inside of the code walking and emitting to the Sensors for the 
per-message process metrics and the per-k/v read/write latency metrics. I also 
found 6-7% of the time was spent in the fetcher metrics which was addressed 
here: https://github.com/apache/kafka/pull/1464. 

Good news: I managed to find the snapshot data :) I will attach it here. The 
majority of the time is *not* the milliseconds() call but the actual 
(synchronized?) walk of Sensors in Sensor.record.

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: aarti gupta
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3811) Introduce KStream metrics recording levels

2016-06-09 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3811:
-

 Summary: Introduce KStream metrics recording levels
 Key: KAFKA-3811
 URL: https://issues.apache.org/jira/browse/KAFKA-3811
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Greg Fodor
Assignee: Guozhang Wang


Follow-up from the discussions here:

https://github.com/apache/kafka/pull/1447
https://issues.apache.org/jira/browse/KAFKA-3769

The proposal is to introduce configuration to control the granularity/volumes 
of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
introduce non-trivial overhead and are possibly less useful once a job has been 
optimized. 

Proposal from guozhangwang:

level0 (stream thread global): per-record process / punctuate latency, commit 
latency, poll latency, etc

level1 (per processor node, and per state store): IO latency, per-record .. 
latency, forward throughput, etc.

And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-09 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Summary: Introduce Kafka Streams metrics recording levels  (was: Introduce 
KStream metrics recording levels)

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313461#comment-15313461
 ] 

Greg Fodor commented on KAFKA-3785:
---

https://github.com/apache/kafka/pull/1464

> Fetcher spending unnecessary time during metrics recording
> --
>
> Key: KAFKA-3785
> URL: https://issues.apache.org/jira/browse/KAFKA-3785
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Greg Fodor
>
> Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
> metrics flushing. Previous discussion here:
> https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3785:
--
Component/s: consumer

> Fetcher spending unnecessary time during metrics recording
> --
>
> Key: KAFKA-3785
> URL: https://issues.apache.org/jira/browse/KAFKA-3785
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Greg Fodor
>
> Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
> metrics flushing. Previous discussion here:
> https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3785:
-

 Summary: Fetcher spending unnecessary time during metrics recording
 Key: KAFKA-3785
 URL: https://issues.apache.org/jira/browse/KAFKA-3785
 Project: Kafka
  Issue Type: Improvement
Reporter: Greg Fodor


Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
metrics flushing. Previous discussion here:

https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313434#comment-15313434
 ] 

Greg Fodor commented on KAFKA-3770:
---

Cut a new PR since I screwed up the rebase on the previous one. This one adds a 
test, etc.

https://github.com/apache/kafka/pull/1463

> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312913#comment-15312913
 ] 

Greg Fodor commented on KAFKA-3758:
---

also, if we did not run at an elevated number of threads, we were hitting that 
issue due to the timeout happening before all tasks had initialized.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312912#comment-15312912
 ] 

Greg Fodor commented on KAFKA-3758:
---

Hey, we're running 16 threads -- for this job we have 25 topics, approx ~350 
topic-partitions involved, but for most of the job there isn't much I/O against 
most of these. Basically we are taking in a very small % of the incoming data 
at the top of the job and processing it, and discarding most of it early.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> 

[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-06-01 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311179#comment-15311179
 ] 

Greg Fodor commented on KAFKA-3769:
---

Thanks Jay!

Guozhang, what are your thoughts on instead of trying to reduce the granularity 
of the metrics, potentially having a way to just disable the process/latency 
metrics collection? I'm still pretty new to KStreams, and haven't used these 
metrics, but I'm guessing they will be used for occasionally tuning the job 
against production data but not necessarily for operational monitoring. (I 
could be wrong about this.) As such, it seems that you may want to just have a 
switch you flip when you are running in production that will disable the 
metrics and maximize the throughput of the job, and then turn it on selectively 
when you want to perform performance measurement. 

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-31 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308306#comment-15308306
 ] 

Greg Fodor commented on KAFKA-3758:
---

No, the kstream job was running across 2 servers, and the kafka cluster was a 3 
node cluster running on separate machines.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> 

[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-05-30 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306381#comment-15306381
 ] 

Greg Fodor commented on KAFKA-3769:
---

Consider the PR a first pass, please advise on how we may want to deal with the 
fact that for KStream jobs with lots of tasks, etc, the overhead of writing the 
various process/poll/latency metrics is immense.

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-05-30 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306371#comment-15306371
 ] 

Greg Fodor commented on KAFKA-3770:
---

https://github.com/apache/kafka/pull/1448

> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-05-30 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3770:
-

 Summary: KStream job should be able to specify linger.ms
 Key: KAFKA-3770
 URL: https://issues.apache.org/jira/browse/KAFKA-3770
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Greg Fodor
Assignee: Guozhang Wang


The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
problematic for jobs that have lots of tasks, since this latency can accrue. It 
seems useful to be able to override the linger.ms in the StreamsConfig. 
Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-05-30 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306321#comment-15306321
 ] 

Greg Fodor commented on KAFKA-3769:
---

It seems it might be desirable to have a way to just flip off some or all of 
the metrics.

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-05-30 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306318#comment-15306318
 ] 

Greg Fodor commented on KAFKA-3769:
---

Actually, additionally it looks like the code path for fetching from RocksDB 
spends most of its time recording the latency metrics :(

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-05-30 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306306#comment-15306306
 ] 

Greg Fodor commented on KAFKA-3769:
---

https://github.com/apache/kafka/pull/1447

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-05-30 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3769:
-

 Summary: KStream job spending 60% of time writing metrics
 Key: KAFKA-3769
 URL: https://issues.apache.org/jira/browse/KAFKA-3769
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang
Priority: Critical


I've been profiling a complex streams job, and found two major hotspots when 
writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-28 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor reopened KAFKA-3758:
---

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>  1732 at 
> 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-28 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305503#comment-15305503
 ] 

Greg Fodor commented on KAFKA-3758:
---

Oh, actually, I'm not so sure. This was not during an unclean shutdown, but 
during a broker rebalance.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> 

[jira] [Resolved] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-28 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor resolved KAFKA-3758.
---
Resolution: Duplicate

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>  1732 at 
> 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-28 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305501#comment-15305501
 ] 

Greg Fodor commented on KAFKA-3758:
---

Ah yes this looks like the same issue, thanks!

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-26 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302542#comment-15302542
 ] 

Greg Fodor commented on KAFKA-3758:
---

Also, the log is truncated at the top to the point where we shut the broker 
off. If there's additional useful information in the log before that you think 
we could share, happy to attach.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> 

[jira] [Updated] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-26 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3758:
--
Attachment: muon.log.1.gz

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>  1732 at 

[jira] [Created] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-05-26 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3758:
-

 Summary: KStream job fails to recover after Kafka broker stopped
 Key: KAFKA-3758
 URL: https://issues.apache.org/jira/browse/KAFKA-3758
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang


We've been doing some testing of a fairly complex KStreams job and under load 
it seems the job fails to rebalance + recover if we shut down one of the kafka 
brokers. The test we were running had a 3-node kafka cluster where each topic 
had at least a replication factor of 2, and we terminated one of the nodes.

Attached is the full log, the root exception seems to be contention on the lock 
on the state directory. The job continues to try to recover but throws errors 
relating to locks over and over. Restarting the job itself resolves the problem.

 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
creating the state manager
 1703 at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
 1704 at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
 1705 at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
 1706 at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
 1707 at 
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
 1708 at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
 1709 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
 1710 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
 1711 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
 1712 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1713 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1714 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
 1715 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1716 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1717 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
 1718 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
 1719 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
 1720 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
 1721 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 1722 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1723 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1724 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
 1725 at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
 1726 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
 1727 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 1728 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
 1729 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
 1730 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
 1731 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
 1732 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
 1733 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
 1734 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
 1735 at 

[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-24 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297745#comment-15297745
 ] 

Greg Fodor commented on KAFKA-3745:
---

Actually this seems like a good enough solution, I forgot to consider that the 
stream still has the join key as it's key so I can grab it in a subsequent map. 
All things considered this seems like a small price to pay for this rare case 
vs introducing the parameter into the joiner interface, so I'm fine with 
closing this ticket.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297733#comment-15297733
 ] 

Greg Fodor commented on KAFKA-3745:
---

Yes, the join key needs to be added to the final joined record. I suppose like 
you mention a nicer approach may be to just to emit the joined record with a 
null in place of where the key will go, and then do a simple .map after the 
join to fill in the key, this seems better than what I am doing now.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297477#comment-15297477
 ] 

Greg Fodor commented on KAFKA-3745:
---

Yep, I admit this is definitely not the most common case. But when it happens, 
the key is basically lost, so the workaround results in passing additional 
state through the system which seems undesirable.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-23 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297250#comment-15297250
 ] 

Greg Fodor commented on KAFKA-3745:
---

sure. we are left joining a KTable against a KStream. The entry in the KTable 
may be null since it's an outer join, so it can't be relied upon to provide the 
key, and the join key for the KStream is derived data from one of the fields 
(in particular it's a parsed substring of one of the columns.) the workaround 
right now is we do that parsing in a previous step and then emit a record with 
the value.

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2016-05-22 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3745:
-

 Summary: Consider adding join key to ValueJoiner interface
 Key: KAFKA-3745
 URL: https://issues.apache.org/jira/browse/KAFKA-3745
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang
Priority: Minor


In working with Kafka Stream joining, it's sometimes the case that a join key 
is not actually present in the values of the joins themselves (if, for example, 
a previous transform generated an ephemeral join key.) In such cases, the 
actual key of the join is not available in the ValueJoiner implementation to be 
used to construct the final joined value. This can be worked around by 
explicitly threading the join key into the value if needed, but it seems like 
extending the interface to pass the join key along as well would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3619) State lock file handle leaks

2016-04-25 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3619:
--
Description: 
The .lock files in the state store directories do not seem to be having their 
file handles freed (despite the locks being freed), so on a complex job the 
number of file handles in use goes up rapidly as the locks are taken for the 
cleanup routine at the end of the thread run loop.  Running lsof shows the 
number of open filehandles on the .lock file increasing rapidly over time. In a 
separate test project, I reproduced the issue and determined that in order for 
the filehandle to be relinquished the FileChannel instance must be properly 
closed. 

PR:

https://github.com/apache/kafka/pull/1267

  was:
The .lock files in the state store directories do not seem to be having their 
file handles freed (despite the locks being freed), so on a complex job the
number of file handles in use goes up rapidly as the locks are taken
for the cleanup routine at the end of the thread run loop.  Running lsof shows 
the number of open filehandles on the .lock file increasing rapidly over time. 
In a separate test project, I reproduced the issue and determined that in order 
for the filehandle to be relinquished the FileChannel instance must be properly 
closed. 

PR:

https://github.com/apache/kafka/pull/1267


> State lock file handle leaks
> 
>
> Key: KAFKA-3619
> URL: https://issues.apache.org/jira/browse/KAFKA-3619
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> The .lock files in the state store directories do not seem to be having their 
> file handles freed (despite the locks being freed), so on a complex job the 
> number of file handles in use goes up rapidly as the locks are taken for the 
> cleanup routine at the end of the thread run loop.  Running lsof shows the 
> number of open filehandles on the .lock file increasing rapidly over time. In 
> a separate test project, I reproduced the issue and determined that in order 
> for the filehandle to be relinquished the FileChannel instance must be 
> properly closed. 
> PR:
> https://github.com/apache/kafka/pull/1267



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3619) State lock file handle leaks

2016-04-25 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3619:
-

 Summary: State lock file handle leaks
 Key: KAFKA-3619
 URL: https://issues.apache.org/jira/browse/KAFKA-3619
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang
Priority: Critical


The .lock files in the state store directories do not seem to be having their 
file handles freed (despite the locks being freed), so on a complex job the
number of file handles in use goes up rapidly as the locks are taken
for the cleanup routine at the end of the thread run loop.  Running lsof shows 
the number of open filehandles on the .lock file increasing rapidly over time. 
In a separate test project, I reproduced the issue and determined that in order 
for the filehandle to be relinquished the FileChannel instance must be properly 
closed. 

PR:

https://github.com/apache/kafka/pull/1267



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3544) Missing topics on startup

2016-04-14 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor resolved KAFKA-3544.
---
Resolution: Not A Problem

> Missing topics on startup
> -
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>  Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-04-14 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15240723#comment-15240723
 ] 

Greg Fodor commented on KAFKA-3544:
---

Ah this makes sense. Fortunately thanks to the other responses you've given 
I've been able to refactor my job to use through() in this case, so it will end 
up not having this problem once the relevant updates are made to through(). 
I'll close the ticket.

> Missing topics on startup
> -
>
> Key: KAFKA-3544
> URL: https://issues.apache.org/jira/browse/KAFKA-3544
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>  Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, 
> on the first run I get errors due to some of the intermediate topics not 
> existing. Subsequent runs work OK. My assumption is streams may be creating 
> topics lazily, so if downstream tasks are initializing before their parents 
> have had a chance to create their necessary topics then the children will 
> attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512
 ] 

Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:05 AM:


Not sure of the best way to share the topology. Here's the relevant part of the 
code:
{code}
builder
.stream(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming")
.map((k, v) -> KeyValue.pair(v.getUserId(), v))
.to(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming-user_id");

KStream 
roomOperationMessagesByUserId = builder
.stream(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming-user_id");

KStream 
userBroadcastsMessagesByUserId =

roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, 
UserBroadcastsMessage::new);

{code}

In this example roomOperationSerde is a Serde for a custom avro type. I'm 
basically pivoting the first stream onto a foreign key and then creating 
another KStream off of that output for a join downstream.

The topology is failing to build on the room_operation_message_incoming-user_id 
topic:

{code}
Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
   [0/1952]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread 

[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512
 ] 

Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM:


Not sure of the best way to share the topology. Here's the relevant part of the 
code:
{code}
builder
.stream(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming")
.map((k, v) -> KeyValue.pair(v.getUserId(), v))
.to(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming-user_id");

KStream 
roomOperationMessagesByUserId = builder
.stream(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming-user_id");

KStream 
userBroadcastsMessagesByUserId =

roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, 
UserBroadcastsMessage::new);

{code}

In this example roomOperationSerde is a Serde for a custom avro type. I'm 
basically pivoting the first stream onto a foreign key and then creating 
another KStream off of that output for a join downstream.

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
   [0/1952]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown 

[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512
 ] 

Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM:


Not sure of the best way to share the topology. Here's the relevant part of the 
code:
{code}
builder
.stream(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming")
.map((k, v) -> KeyValue.pair(v.getUserId(), v))
.to(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming-user_id");

KStream 
roomOperationMessagesByUserId = builder
.stream(Serdes.String(), roomOperationSerde, 
"room_operation_message_incoming-user_id");

KStream 
userBroadcastsMessagesByUserId =

roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, 
UserBroadcastsMessage::new);

{code}

In this example roomOperationSerde is a Serde for a custom avro type. I'm 
basically pivoting the first stream onto a foreign key and then creating 
another KStream off of that output for a join downstream.

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
   [0/1952]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread 

[jira] [Commented] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512
 ] 

Greg Fodor commented on KAFKA-3544:
---

Not sure of the best way to share the topology. Here's the relevant part of the 
code:
{code}
builder
.stream(Serdes.Long(), userSpaceBroadcastSerde, 
"positron-db-user_space_broadcasts")
.map((id, broadcast) -> 
KeyValue.pair(broadcast.getUserId().toString(), broadcast))
.to(Serdes.String(), userSpaceBroadcastSerde, 
"user_space_broadcasts-user_id");

KTable userSpaceBroadcastsByUserId 
= builder
.stream(Serdes.String(), userSpaceBroadcastSerde, 
"user_space_broadcasts-user_id")
.aggregateByKey(...);
{code}

In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm 
basically pivoting the first stream onto a foreign key and then creating a 
KTable off of that output by tapping it and then aggregating. (Given our 
discussions on other tickets there may be a way to simplify this, but I wanted 
to capture it as-is for this report.)

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: External source topic not found: 
room_operation_message_incoming-user_id
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
   [0/1952]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete 
[StreamThread-2]
Exception in thread 

[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236427#comment-15236427
 ] 

Greg Fodor commented on KAFKA-3543:
---

Just something akin to a flatTransform where the output of the transformer is 
assumed to have a value of some kind of enumerable. Probably would be pretty 
convoluted since it would impact a lot of method signatures.

I guess at a higher level, I wonder if there's any way to make your points on 
stateless vs non-stateless operators requiring Serdes more intuitive. It might 
back out to just docs.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236417#comment-15236417
 ] 

Greg Fodor commented on KAFKA-3542:
---

great. feel free to close.

> Add "repartition (+ join)" operations to streams
> 
>
> Key: KAFKA-3542
> URL: https://issues.apache.org/jira/browse/KAFKA-3542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Minor
>
> A common operation in Kafka Streams seems to be to repartition the stream 
> onto a different column, usually for joining. The current way I've been doing 
> this:
> - Perform a map on the stream to the same value with a new key (the key we're 
> going to join on, usually a foreign key)
> - Sink the stream into a new topic
> - Create a new stream sourcing that topic
> - Perform the join
> Note that without explicitly sinking the intermediate topic, the topology 
> will fail to build because of the assertion that both sides of a join are 
> connected to source nodes. When you perform a map, the link between the 
> source nodes and the tail node of the topology is broken (by setting the 
> source nodes to null) so you are forced to sink to use that output in a join.
> It seems that this pattern could possibly be rolled into much simpler 
> operation(s). For example, the map could be changed into a "repartition" 
> method where you just return the new key. And the join itself could be 
> simplified by letting you specify a re-partition function on either side of 
> the join and create the intermediate topic implicitly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236375#comment-15236375
 ] 

Greg Fodor commented on KAFKA-3543:
---

That makes sense, thanks! It may be useful to provide a helper method here of 
some kind, but the idiom above seems reasonable. Happy to consider this issue 
closed either way.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236374#comment-15236374
 ] 

Greg Fodor commented on KAFKA-3542:
---

Right, this map approach is what I am doing right now before all of my joins, 
though I didn't realize I could use through() to generate a joinable stream 
without sourcing it explicity from the new topic. I will see if some of my 
joins can be satisfied with the aggregator first approach. The thing that 
bothers me about the current map -> sink approach is that the map is not really 
DRY (I should just need to specify the selector to re-partition on) and the 
intermediate topic name should just be generated. I agree an implicit through() 
call could be useful in place of the assertion currently being made to 
determine if two streams are joinable.

> Add "repartition (+ join)" operations to streams
> 
>
> Key: KAFKA-3542
> URL: https://issues.apache.org/jira/browse/KAFKA-3542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Minor
>
> A common operation in Kafka Streams seems to be to repartition the stream 
> onto a different column, usually for joining. The current way I've been doing 
> this:
> - Perform a map on the stream to the same value with a new key (the key we're 
> going to join on, usually a foreign key)
> - Sink the stream into a new topic
> - Create a new stream sourcing that topic
> - Perform the join
> Note that without explicitly sinking the intermediate topic, the topology 
> will fail to build because of the assertion that both sides of a join are 
> connected to source nodes. When you perform a map, the link between the 
> source nodes and the tail node of the topology is broken (by setting the 
> source nodes to null) so you are forced to sink to use that output in a join.
> It seems that this pattern could possibly be rolled into much simpler 
> operation(s). For example, the map could be changed into a "repartition" 
> method where you just return the new key. And the join itself could be 
> simplified by letting you specify a re-partition function on either side of 
> the join and create the intermediate topic implicitly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236343#comment-15236343
 ] 

Greg Fodor commented on KAFKA-3542:
---

Ah, I may understand what you're getting at here -- to do the operation I have 
in mind, you would first perform an aggregation to pivot the streams onto the 
proper keys (via the selector), and then join those streams. Is that correct?

> Add "repartition (+ join)" operations to streams
> 
>
> Key: KAFKA-3542
> URL: https://issues.apache.org/jira/browse/KAFKA-3542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Minor
>
> A common operation in Kafka Streams seems to be to repartition the stream 
> onto a different column, usually for joining. The current way I've been doing 
> this:
> - Perform a map on the stream to the same value with a new key (the key we're 
> going to join on, usually a foreign key)
> - Sink the stream into a new topic
> - Create a new stream sourcing that topic
> - Perform the join
> Note that without explicitly sinking the intermediate topic, the topology 
> will fail to build because of the assertion that both sides of a join are 
> connected to source nodes. When you perform a map, the link between the 
> source nodes and the tail node of the topology is broken (by setting the 
> source nodes to null) so you are forced to sink to use that output in a join.
> It seems that this pattern could possibly be rolled into much simpler 
> operation(s). For example, the map could be changed into a "repartition" 
> method where you just return the new key. And the join itself could be 
> simplified by letting you specify a re-partition function on either side of 
> the join and create the intermediate topic implicitly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236339#comment-15236339
 ] 

Greg Fodor commented on KAFKA-3543:
---

Ah interesting, my assumption was that the items emitted from the transform 
need to be serializable. Is the pattern you describe above basically avoiding 
this because the flatMap unrolls the Iterables into serializable values?

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary stateful 
> transformation to a stream, you either have to use a TransformerSupplier or 
> ProcessorSupplier sent to transform() or process(). The custom processor will 
> allow you to emit multiple new values, but the process() method currently 
> terminates that branch of the topology so you can't apply additional data 
> flow. transform() lets you continue the data flow, but forces you to emit a 
> single value for every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236331#comment-15236331
 ] 

Greg Fodor commented on KAFKA-3542:
---

I might just not be understanding, but a simple example is if I have a database 
table being streamed in from Kafka connect and I'd like to join it on another 
table based on a foreign key, my understanding is I need to ensure both streams 
are partitioned on the same key before they are fed into the join. The current 
join implementation doesn't seem to allow me to specify something to extract 
the proper key to join on, I believe it assumes both inputs are already 
pre-partitioned on the proper keys.

> Add "repartition (+ join)" operations to streams
> 
>
> Key: KAFKA-3542
> URL: https://issues.apache.org/jira/browse/KAFKA-3542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Minor
>
> A common operation in Kafka Streams seems to be to repartition the stream 
> onto a different column, usually for joining. The current way I've been doing 
> this:
> - Perform a map on the stream to the same value with a new key (the key we're 
> going to join on, usually a foreign key)
> - Sink the stream into a new topic
> - Create a new stream sourcing that topic
> - Perform the join
> Note that without explicitly sinking the intermediate topic, the topology 
> will fail to build because of the assertion that both sides of a join are 
> connected to source nodes. When you perform a map, the link between the 
> source nodes and the tail node of the topology is broken (by setting the 
> source nodes to null) so you are forced to sink to use that output in a join.
> It seems that this pattern could possibly be rolled into much simpler 
> operation(s). For example, the map could be changed into a "repartition" 
> method where you just return the new key. And the join itself could be 
> simplified by letting you specify a re-partition function on either side of 
> the join and create the intermediate topic implicitly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3545) Generalized Serdes for List/Map

2016-04-11 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3545:
-

 Summary: Generalized Serdes for List/Map
 Key: KAFKA-3545
 URL: https://issues.apache.org/jira/browse/KAFKA-3545
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Greg Fodor
Assignee: Guozhang Wang
Priority: Minor


In working with Kafka Streams I've found it's often the case I want to perform 
a "group by" operation, where I repartition a stream based on a foreign key and 
then do an aggregation of all the values into a single collection, so the 
stream becomes one where each entry has a value that is a serialized list of 
values that belonged to the key. (This seems unrelated to the 'group by' 
operation talked about in KAFKA-3544.) Basically the same typical group by 
operation found in systems like Cascading.

In order to create these intermediate list values I needed to define custom 
avro schemas that simply wrap the elements of interest into a list. It seems 
desirable that there be some basic facility for constructing simple Serdes of 
Lists/Maps/Sets of other types, potentially using avro's serialization under 
the hood. If this existed in the core library it would also enable the addition 
of higher level operations on streams that can use these Serdes to perform 
simple operations like the "group by" example I mention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3544) Missing topics on startup

2016-04-11 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3544:
-

 Summary: Missing topics on startup
 Key: KAFKA-3544
 URL: https://issues.apache.org/jira/browse/KAFKA-3544
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang


When running a relatively complex job with multiple tasks and state stores, on 
the first run I get errors due to some of the intermediate topics not existing. 
Subsequent runs work OK. My assumption is streams may be creating topics 
lazily, so if downstream tasks are initializing before their parents have had a 
chance to create their necessary topics then the children will attempt to start 
consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236121#comment-15236121
 ] 

Greg Fodor edited comment on KAFKA-3543 at 4/11/16 10:40 PM:
-

Also note that in my case flatMap() is not sufficient for my needs, since I 
need to able to use state stores.


was (Author: gfodor):
Also note that in my case the iflatMap() is not sufficient for my needs, since 
I need to able to use state stores.

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary transformation to a 
> stream, you either have to use a TransformerSupplier or ProcessorSupplier 
> sent to transform() or process(). The custom processor will allow you to emit 
> multiple new values, but the process() method currently terminates that 
> branch of the topology so you can't apply additional data flow. transform() 
> lets you continue the data flow, but forces you to emit a single value for 
> every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3543) Allow a variant of transform() which allows emitting multiple values

2016-04-11 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3543:
-

 Summary: Allow a variant of transform() which allows emitting 
multiple values
 Key: KAFKA-3543
 URL: https://issues.apache.org/jira/browse/KAFKA-3543
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang


Right now it seems that if you want to apply an arbitrary transformation to a 
stream, you either have to use a TransformerSupplier or ProcessorSupplier sent 
to transform() or process(). The custom processor will allow you to emit 
multiple new values, but the process() method currently terminates that branch 
of the topology so you can't apply additional data flow. transform() lets you 
continue the data flow, but forces you to emit a single value for every input 
value.

(It actually doesn't quite force you to do this, since you can hold onto the 
ProcessorContext and emit multiple, but that's probably not the ideal way to do 
it :))

It seems desirable to somehow allow a transformation that emits multiple values 
per input value. I'm not sure of the best way to factor this inside of the 
current TransformerSupplier/Transformer architecture in a way that is clean and 
efficient -- currently I'm doing the workaround above of just calling forward() 
myself on the context and actually emitting dummy values which are filtered out 
downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3543) Allow a variant of transform() which can emit multiple values

2016-04-11 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3543:
--
Summary: Allow a variant of transform() which can emit multiple values  
(was: Allow a variant of transform() which allows emitting multiple values)

> Allow a variant of transform() which can emit multiple values
> -
>
> Key: KAFKA-3543
> URL: https://issues.apache.org/jira/browse/KAFKA-3543
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Right now it seems that if you want to apply an arbitrary transformation to a 
> stream, you either have to use a TransformerSupplier or ProcessorSupplier 
> sent to transform() or process(). The custom processor will allow you to emit 
> multiple new values, but the process() method currently terminates that 
> branch of the topology so you can't apply additional data flow. transform() 
> lets you continue the data flow, but forces you to emit a single value for 
> every input value.
> (It actually doesn't quite force you to do this, since you can hold onto the 
> ProcessorContext and emit multiple, but that's probably not the ideal way to 
> do it :))
> It seems desirable to somehow allow a transformation that emits multiple 
> values per input value. I'm not sure of the best way to factor this inside of 
> the current TransformerSupplier/Transformer architecture in a way that is 
> clean and efficient -- currently I'm doing the workaround above of just 
> calling forward() myself on the context and actually emitting dummy values 
> which are filtered out downstream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3542) Add "repartition (+ join)" operations to streams

2016-04-11 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3542:
-

 Summary: Add "repartition (+ join)" operations to streams
 Key: KAFKA-3542
 URL: https://issues.apache.org/jira/browse/KAFKA-3542
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Greg Fodor
Assignee: Guozhang Wang
Priority: Minor


A common operation in Kafka Streams seems to be to repartition the stream onto 
a different column, usually for joining. The current way I've been doing this:

- Perform a map on the stream to the same value with a new key (the key we're 
going to join on, usually a foreign key)
- Sink the stream into a new topic
- Create a new stream sourcing that topic
- Perform the join

Note that without explicitly sinking the intermediate topic, the topology will 
fail to build because of the assertion that both sides of a join are connected 
to source nodes. When you perform a map, the link between the source nodes and 
the tail node of the topology is broken (by setting the source nodes to null) 
so you are forced to sink to use that output in a join.

It seems that this pattern could possibly be rolled into much simpler 
operation(s). For example, the map could be changed into a "repartition" method 
where you just return the new key. And the join itself could be simplified by 
letting you specify a re-partition function on either side of the join and 
create the intermediate topic implicitly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3360) Add a protocol page/section to the official Kafka documentation

2016-03-19 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198422#comment-15198422
 ] 

Greg Fodor commented on KAFKA-3360:
---

Apologies as I did not really know the best place to put this, and wasn't sure 
of the proper way to correct things myself, but I wanted to report an error in 
the documentation at:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The v1 FetchResponse protocol states that the ThrottleTime field is reported 
*after* the messages, but in fact the ThrottleTime precedes the message set. So 
the line:

```
[TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] 
ThrottleTime
```

Should instead read:

```
ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize 
MessageSet]]
```

The main Kafka protocol docs reflect this properly.

> Add a protocol page/section to the official Kafka documentation
> ---
>
> Key: KAFKA-3360
> URL: https://issues.apache.org/jira/browse/KAFKA-3360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.0.0
>
>
> This is an umbrella jira to track adding a protocol page/section to the 
> official Kafka documentation. It lays out subtasks for initial content and 
> follow up improvements and fixes. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3360) Add a protocol page/section to the official Kafka documentation

2016-03-19 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198422#comment-15198422
 ] 

Greg Fodor edited comment on KAFKA-3360 at 3/16/16 11:39 PM:
-

Apologies as I did not really know the best place to put this, and wasn't sure 
of the proper way to correct things myself, but I wanted to report an error in 
the documentation at:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The v1 FetchResponse protocol states that the ThrottleTime field is reported 
*after* the messages, but in fact the ThrottleTime precedes the message set. So 
the line:

{code}
[TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] 
ThrottleTime
{code}

Should instead read:

{code}
ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize 
MessageSet]]
{code}

The main Kafka protocol docs reflect this properly.


was (Author: gfodor):
Apologies as I did not really know the best place to put this, and wasn't sure 
of the proper way to correct things myself, but I wanted to report an error in 
the documentation at:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The v1 FetchResponse protocol states that the ThrottleTime field is reported 
*after* the messages, but in fact the ThrottleTime precedes the message set. So 
the line:

```
[TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] 
ThrottleTime
```

Should instead read:

```
ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize 
MessageSet]]
```

The main Kafka protocol docs reflect this properly.

> Add a protocol page/section to the official Kafka documentation
> ---
>
> Key: KAFKA-3360
> URL: https://issues.apache.org/jira/browse/KAFKA-3360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.0.0
>
>
> This is an umbrella jira to track adding a protocol page/section to the 
> official Kafka documentation. It lays out subtasks for initial content and 
> follow up improvements and fixes. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)