[jira] [Created] (KAFKA-5571) Possible deadlock during shutdown in setState in kafka streams 10.2
Greg Fodor created KAFKA-5571: - Summary: Possible deadlock during shutdown in setState in kafka streams 10.2 Key: KAFKA-5571 URL: https://issues.apache.org/jira/browse/KAFKA-5571 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.1 Reporter: Greg Fodor Attachments: kafka-streams.deadlock.log I'm running a 10.2 job across 5 nodes with 32 stream threads on each node and find that when gracefully shutdown all of them at once via an ansible scripts, some of the nodes end up freezing -- at a glance the attached thread dump implies a deadlock between stream threads trying to update their state via setState. We haven't had this problem before but it may or may not be related to changes in 10.2 (we are upgrading from 10.0 to 10.2) when we gracefully shutdown all nodes simultaneously, what typically happens is some subset of the nodes end up not shutting down completely but end up going through a rebalance first. it seems this deadlock requires this rebalancing to occur simultaneously with the graceful shutdown. if we happen to shut them down and no rebalance happens, i don't believe this deadlock is triggered. the deadlock appears related to the state change handlers being subscribed across threads and the fact that both StreamThread#setState and StreamStateListener#onChange are both synchronized methods. Another thing worth mentioning is that one of the transformers used in the job has a close() method that can take 10-15 seconds to finish since it needs to flush some data to a database. Having a long close() method combined with a rebalance during a shutdown across many threads may be necessary for reproduction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627097#comment-15627097 ] Greg Fodor commented on KAFKA-4113: --- Hey [~guozhang], I have been able to reproduce a bootstrapping issue on a fresh local node, and I think there might be some stuff I either need clarity on or may even be a bug. The root cause seems to be here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L137 For a completely new node/topology with a KTable topic with existing state, there is no consumer metadata, so this initializes the offset limit to 0, which results in the state restoration loop to basically not consume any records. I've only reproduced this in a local case where I was sinking data to a KTable topic and then initialized the topology for the first time, which is a one-time event, but I'm wondering if this offset limit default of zero could be causing issues later in the lifecycle of the topology as well. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593018#comment-15593018 ] Greg Fodor commented on KAFKA-4113: --- Oh, so it should be doing exactly what makes sense to me -- I am on 0.10.0. Let me verify that there isn't something else going on! Thanks for the info. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4317) RocksDB checkpoint files lost on kill -9
Greg Fodor created KAFKA-4317: - Summary: RocksDB checkpoint files lost on kill -9 Key: KAFKA-4317 URL: https://issues.apache.org/jira/browse/KAFKA-4317 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Greg Fodor Assignee: Guozhang Wang Right now, the checkpoint files for logged RocksDB stores are written during a graceful shutdown, and removed upon restoration. Unfortunately this means that in a scenario where the process is forcibly killed, the checkpoint files are not there, so all RocksDB stores are rematerialized from scratch on the next launch. In a way, this is good, because it simulates bootstrapping a new node (for example, its a good way to see how much I/O is used to rematerialize the stores) however it leads to longer recovery times when a non-graceful shutdown occurs and we want to get the job up and running again. It seems that two possible things to consider: - Simply do not remove checkpoint files on restoring. This way a kill -9 will result in only repeating the restoration of all the data generated in the source topics since the last graceful shutdown. - Continually update the checkpoint files (perhaps on commit) -- this would result in the least amount of overhead/latency in restarting, but the additional complexity may not be worth it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587991#comment-15587991 ] Greg Fodor commented on KAFKA-4113: --- I guess what I would argue is that KStreamBuilder#table should have identical semantics to a logged state store backed KTable, except you are specifying the topic and (obv) it's not mutable from the job's POV. It should first check if it has a local, checkpointed rocksdb, and if so, it should just read from the checkpoint forward. If not, it should rematerialize from offset 0 and block the start of the job until it does. On shutdown, it should write the checkpoint file. It seems to me that this might boil down to just having it be "use this topic for the logged state store backing this KTableImpl." I'm sure there are cases I'm missing, but having that be the behavior for KStreamBuilder#table would effectively solve all of our problems as far as I can tell. The semantics + I/O impact of this approach back out to the same exact ones you have when you use a normal user-created persistent state store, but just are managing the topic writes yourself. > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587966#comment-15587966 ] Greg Fodor commented on KAFKA-4113: --- Having played around with Kafka Streams for a while now, I am still confused (and we still get burned) by this. Let me walk through a case, and see if you guys can find out where I am misunderstanding. Say we have a topic that's a user table changelog that has user id keys and user records. And we have a clickstream topic that is just a user id to url. For the sake of this example, lets assume our kafka streams job has been running from t = 0 where both topics were empty, so there's no bootstrapping problem. In the Kafka Streams DSL, I would tap the user table topic via `KStreamBuilder#table`. As best I can tell, this creates a KTable with: - An unlogged rocksdb state store (which is going to land on disk) - A new source that is the table topic After this, I'm going to tap + inner join the click stream as a KStream on user id, and just for this example lets assume I'll sink it all out too to a new topic. As my node is humming along, it is writing the user id -> user record k/vs to the local rocksdb but is *not* storing the changes to the rocksdb in a topic, because it is not marked as logged. When it reads a record from the KStream, the join is performed by looking for the key in the state store. As mentioned, my understanding is that the join against the stream will wait until the records for the KTable which have earlier timestamps have been consumed. This makes sense. If I terminate and restart the java process, the kafka consumer for the KTable will pick off at the last committed offset for the user table topic. It may re-consume a few seconds worth of records, and re-write a few keys in the rocks db store, but after that it's still got the full historical state of the topic. So joins against any user id will continue to work. Where things completely stop making sense for me is if I lose the node. If I lose the node, i lose my rocksdb, which is not logged so is not backed by a changelog topic. When I bring up a new node, my understanding is that the consumer will *not* start at the beginning of the topic used for the KTable, it will just pick up at the last commit. So what I end up with is a rocksdb that only contains the last couple of records from the user table topic. This is obviously really broken, because now my joins will start failling. (And it seems I was lulled into complaency here since I was robust across JVM restarts, but not across node failures.) I believe this problem also happens in a more nefarious way upon rebalances, since if a partition of the KTable gets reassigned, it will also have a partially complete rocksdb store for that partition since it will just consume from the last committed offset. Similarly, and even scarier, if it gets assigned back to the original node, that node now has a rocksdb store with a very small gap, for the key changes that happened during the period where it was assigned to another node. I am not sure if I am missing something here but this has been the behavior we have seen. The workarounds we have done for this problem are: - write a routine to let us reset the KTable topics consumer offsets to zero (still doesn't help with a rebalance) - perform a "touch" to the database records we are flushing to kafka, so new copies of all of the records are appended to the topic via kafka connect, and are forced into the rocksdb stores (this works well, but obviously is terrible) - put a dummy aggregation/reduce after the tap of the KTable topic, which forces things into a logged state store that will be fully materialized on startup if it is missing Thoughts? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no
[jira] [Commented] (KAFKA-3545) Generalized Serdes for List/Map
[ https://issues.apache.org/jira/browse/KAFKA-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583149#comment-15583149 ] Greg Fodor commented on KAFKA-3545: --- Yes > Generalized Serdes for List/Map > --- > > Key: KAFKA-3545 > URL: https://issues.apache.org/jira/browse/KAFKA-3545 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Priority: Minor > Labels: api, newbie > Fix For: 0.10.1.0 > > > In working with Kafka Streams I've found it's often the case I want to > perform a "group by" operation, where I repartition a stream based on a > foreign key and then do an aggregation of all the values into a single > collection, so the stream becomes one where each entry has a value that is a > serialized list of values that belonged to the key. (This seems unrelated to > the 'group by' operation talked about in KAFKA-3544.) Basically the same > typical group by operation found in systems like Cascading. > In order to create these intermediate list values I needed to define custom > avro schemas that simply wrap the elements of interest into a list. It seems > desirable that there be some basic facility for constructing simple Serdes of > Lists/Maps/Sets of other types, potentially using avro's serialization under > the hood. If this existed in the core library it would also enable the > addition of higher level operations on streams that can use these Serdes to > perform simple operations like the "group by" example I mention. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3545) Generalized Serdes for List/Map
[ https://issues.apache.org/jira/browse/KAFKA-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor resolved KAFKA-3545. --- Resolution: Fixed Fix Version/s: 0.10.1.0 > Generalized Serdes for List/Map > --- > > Key: KAFKA-3545 > URL: https://issues.apache.org/jira/browse/KAFKA-3545 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Priority: Minor > Labels: api, newbie > Fix For: 0.10.1.0 > > > In working with Kafka Streams I've found it's often the case I want to > perform a "group by" operation, where I repartition a stream based on a > foreign key and then do an aggregation of all the values into a single > collection, so the stream becomes one where each entry has a value that is a > serialized list of values that belonged to the key. (This seems unrelated to > the 'group by' operation talked about in KAFKA-3544.) Basically the same > typical group by operation found in systems like Cascading. > In order to create these intermediate list values I needed to define custom > avro schemas that simply wrap the elements of interest into a list. It seems > desirable that there be some basic facility for constructing simple Serdes of > Lists/Maps/Sets of other types, potentially using avro's serialization under > the hood. If this existed in the core library it would also enable the > addition of higher level operations on streams that can use these Serdes to > perform simple operations like the "group by" example I mention. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4281) Should be able to forward aggregation values immediately
[ https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560765#comment-15560765 ] Greg Fodor commented on KAFKA-4281: --- PR: https://github.com/apache/kafka/pull/1998 If this approach seems sane, please take a look especially at the window variants -- I am not too familiar with those APIs. > Should be able to forward aggregation values immediately > > > Key: KAFKA-4281 > URL: https://issues.apache.org/jira/browse/KAFKA-4281 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > KIP-63 introduced changes to the behavior of aggregations such that the > result of aggregations will not appear to subsequent processors until a state > store flush occurs. This is problematic for latency sensitive aggregations > since flushes occur generally at commit.interval.ms, which is usually a few > seconds. Combined with several aggregations, this can result in several > seconds of latency through a topology for steps dependent upon aggregations. > Two potential solutions: > - Allow finer control over the state store flushing intervals > - Allow users to change the behavior so that certain aggregations will > immediately forward records to the next step (as was the case pre-KIP-63) > A PR is attached that takes the second approach. To add this unfortunately a > large number of files needed to be touched, and this effectively doubles the > number of method signatures around grouping on KTable and KStream. I tried an > alternative approach that let the user opt-in to immediate forwarding via an > additional builder method on KGroupedStream/Table but this didn't work as > expected because in order for the latency to go away, the KTableImpl itself > must also mark its source as forward immediate (otherwise we will still see > latency due to the materialization of the KTableSource still relying upon > state store flushes to propagate.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4281) Should be able to forward aggregation values immediately
Greg Fodor created KAFKA-4281: - Summary: Should be able to forward aggregation values immediately Key: KAFKA-4281 URL: https://issues.apache.org/jira/browse/KAFKA-4281 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.0 Reporter: Greg Fodor Assignee: Guozhang Wang KIP-63 introduced changes to the behavior of aggregations such that the result of aggregations will not appear to subsequent processors until a state store flush occurs. This is problematic for latency sensitive aggregations since flushes occur generally at commit.interval.ms, which is usually a few seconds. Combined with several aggregations, this can result in several seconds of latency through a topology for steps dependent upon aggregations. Two potential solutions: - Allow finer control over the state store flushing intervals - Allow users to change the behavior so that certain aggregations will immediately forward records to the next step (as was the case pre-KIP-63) A PR is attached that takes the second approach. To add this unfortunately a large number of files needed to be touched, and this effectively doubles the number of method signatures around grouping on KTable and KStream. I tried an alternative approach that let the user opt-in to immediate forwarding via an additional builder method on KGroupedStream/Table but this didn't work as expected because in order for the latency to go away, the KTableImpl itself must also mark its source as forward immediate (otherwise we will still see latency due to the materialization of the KTableSource still relying upon state store flushes to propagate.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4043) User-defined handler for topology restart
[ https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor resolved KAFKA-4043. --- Resolution: Not A Problem > User-defined handler for topology restart > - > > Key: KAFKA-4043 > URL: https://issues.apache.org/jira/browse/KAFKA-4043 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Since Kafka Streams is just a library, there's a lot of cool stuff we've been > able to do that would be trickier if it were part of a larger > cluster-oriented job execution system that had assumptions about the > semantics of a job. One of the jobs we have uses Kafka Streams to do top > level data flow, and then one of our processors actually will kick off > background threads to do work based upon the data flow state. Happy to fill > in more details of our use-case, but fundamentally the model is that we have > a Kafka Streams data flow that is reading state from upstream, and that state > dictates that work needs to be done, which results in a dedicated work thread > to be spawned by our job. > This works great, but we're running into an issue when there is partition > reassignment, since we have no way to detect this and cleanly shut down these > threads. In our case, we'd like to shut down the background worker threads if > there is a partition rebalance or if the job raises an exception and attempts > to restart. In practice what is happening is we are getting duplicate threads > for the same work on a partition rebalance. > Implementation-wise, this seems like some type of event handler that can be > attached to the topology at build time that can will be called when the data > flow needs to rebalance or rebuild its task threads in general (ideally > passing as much information about the reason along.) I could imagine this > being factored similarly to the KafkaStreams#setUncaughtExceptionHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4043) User-defined handler for topology restart
[ https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472271#comment-15472271 ] Greg Fodor commented on KAFKA-4043: --- Ah, that should work for us. Thanks! > User-defined handler for topology restart > - > > Key: KAFKA-4043 > URL: https://issues.apache.org/jira/browse/KAFKA-4043 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Since Kafka Streams is just a library, there's a lot of cool stuff we've been > able to do that would be trickier if it were part of a larger > cluster-oriented job execution system that had assumptions about the > semantics of a job. One of the jobs we have uses Kafka Streams to do top > level data flow, and then one of our processors actually will kick off > background threads to do work based upon the data flow state. Happy to fill > in more details of our use-case, but fundamentally the model is that we have > a Kafka Streams data flow that is reading state from upstream, and that state > dictates that work needs to be done, which results in a dedicated work thread > to be spawned by our job. > This works great, but we're running into an issue when there is partition > reassignment, since we have no way to detect this and cleanly shut down these > threads. In our case, we'd like to shut down the background worker threads if > there is a partition rebalance or if the job raises an exception and attempts > to restart. In practice what is happening is we are getting duplicate threads > for the same work on a partition rebalance. > Implementation-wise, this seems like some type of event handler that can be > attached to the topology at build time that can will be called when the data > flow needs to rebalance or rebuild its task threads in general (ideally > passing as much information about the reason along.) I could imagine this > being factored similarly to the KafkaStreams#setUncaughtExceptionHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468374#comment-15468374 ] Greg Fodor commented on KAFKA-3769: --- It's just the sensor calls inside of Selector, not Kafka Streams specific. I'll verify as much as I can from the profiler snapshot that it's the same issue and will open a jira. > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > Fix For: 0.10.1.0 > > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected
[ https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464171#comment-15464171 ] Greg Fodor edited comment on KAFKA-4120 at 9/5/16 5:34 PM: --- We were able to work around it by just creating a proper Avro class for the byte data, but I think it would probably be helpful to future users if there were a way to prevent this from happening -- an exception doesn't seem unreasonable. was (Author: gfodor): We were able to work around it by just creating a proper Avro class for the byte data, but I think it would probably be helpful to future if there were a way to prevent this from happening -- an exception doesn't seem unreasonable. > byte[] keys in RocksDB state stores do not work as expected > --- > > Key: KAFKA-4120 > URL: https://issues.apache.org/jira/browse/KAFKA-4120 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > We ran into an issue using a byte[] key in a RocksDB state store (with the > byte array serde.) Internally, the RocksDB store keeps a LRUCache that is > backed by a LinkedHashMap that sits between the callers and the actual db. > The problem is that while the underlying rocks db will persist byte arrays > with equal data as equivalent keys, the LinkedHashMap uses byte[] reference > equality from Object.equals/hashcode. So, this can result in multiple entries > in the cache for two different byte arrays that have the same contents and > are backed by the same key in the db, resulting in unexpected behavior. > One such behavior that manifests from this is if you store a value in the > state store with a specific key, if you re-read that key with the same byte > array you will get the new value, but if you re-read that key with a > different byte array with the same bytes, you will get a stale value until > the db is flushed. (This made it particularly tricky to track down what was > happening :)) > The workaround for us is to convert the keys from raw byte arrays to a > deserialized avro structure that provides proper hashcode/equals semantics > for the intermediate cache. In general this seems like good practice, so one > of the proposed solutions is to simply emit a warning or exception if a key > type with breaking semantics like this is provided. > A few proposed solutions: > - When the state store is defined on array keys, ensure that the cache map > does proper comparisons on array values not array references. This would fix > this problem, but seems a bit strange to special case. However, I have a hard > time of thinking of other examples where this behavior would burn users. > - Change the LRU cache to deserialize and serialize all keys to bytes and use > a value based comparison for the map. This would be the most correct, as it > would ensure that both the rocks db and the cache have identical key spaces > and equality/hashing semantics. However, this is probably slow, and since the > general case of using avro record types as keys works fine, it will largely > be unnecessary overhead. > - Don't change anything about the behavior, but trigger a warning in the log > or fail to start if a state store is defined on array keys (or possibly any > key type that fails to properly override Object.equals/hashcode.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected
[ https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464171#comment-15464171 ] Greg Fodor commented on KAFKA-4120: --- We were able to work around it by just creating a proper Avro class for the byte data, but I think it would probably be helpful to future if there were a way to prevent this from happening -- an exception doesn't seem unreasonable. > byte[] keys in RocksDB state stores do not work as expected > --- > > Key: KAFKA-4120 > URL: https://issues.apache.org/jira/browse/KAFKA-4120 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > We ran into an issue using a byte[] key in a RocksDB state store (with the > byte array serde.) Internally, the RocksDB store keeps a LRUCache that is > backed by a LinkedHashMap that sits between the callers and the actual db. > The problem is that while the underlying rocks db will persist byte arrays > with equal data as equivalent keys, the LinkedHashMap uses byte[] reference > equality from Object.equals/hashcode. So, this can result in multiple entries > in the cache for two different byte arrays that have the same contents and > are backed by the same key in the db, resulting in unexpected behavior. > One such behavior that manifests from this is if you store a value in the > state store with a specific key, if you re-read that key with the same byte > array you will get the new value, but if you re-read that key with a > different byte array with the same bytes, you will get a stale value until > the db is flushed. (This made it particularly tricky to track down what was > happening :)) > The workaround for us is to convert the keys from raw byte arrays to a > deserialized avro structure that provides proper hashcode/equals semantics > for the intermediate cache. In general this seems like good practice, so one > of the proposed solutions is to simply emit a warning or exception if a key > type with breaking semantics like this is provided. > A few proposed solutions: > - When the state store is defined on array keys, ensure that the cache map > does proper comparisons on array values not array references. This would fix > this problem, but seems a bit strange to special case. However, I have a hard > time of thinking of other examples where this behavior would burn users. > - Change the LRU cache to deserialize and serialize all keys to bytes and use > a value based comparison for the map. This would be the most correct, as it > would ensure that both the rocks db and the cache have identical key spaces > and equality/hashing semantics. However, this is probably slow, and since the > general case of using avro record types as keys works fine, it will largely > be unnecessary overhead. > - Don't change anything about the behavior, but trigger a warning in the log > or fail to start if a state store is defined on array keys (or possibly any > key type that fails to properly override Object.equals/hashcode.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463861#comment-15463861 ] Greg Fodor commented on KAFKA-3769: --- I've done some additional profiling and I have found that this problem also seems to crop up in complex kafka streams jobs within the Kafka core Selector class. Should I open another JIRA? > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > Fix For: 0.10.1.0 > > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected
Greg Fodor created KAFKA-4120: - Summary: byte[] keys in RocksDB state stores do not work as expected Key: KAFKA-4120 URL: https://issues.apache.org/jira/browse/KAFKA-4120 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.1 Reporter: Greg Fodor Assignee: Guozhang Wang We ran into an issue using a byte[] key in a RocksDB state store (with the byte array serde.) Internally, the RocksDB store keeps a LRUCache that is backed by a LinkedHashMap that sits between the callers and the actual db. The problem is that while the underlying rocks db will persist byte arrays with equal data as equivalent keys, the LinkedHashMap uses byte[] reference equality from Object.equals/hashcode. So, this can result in multiple entries in the cache for two different byte arrays that have the same contents and are backed by the same key in the db, resulting in unexpected behavior. One such behavior that manifests from this is if you store a value in the state store with a specific key, if you re-read that key with the same byte array you will get the new value, but if you re-read that key with a different byte array with the same bytes, you will get a stale value until the db is flushed. (This made it particularly tricky to track down what was happening :)) The workaround for us is to convert the keys from raw byte arrays to a deserialized avro structure that provides proper hashcode/equals semantics for the intermediate cache. In general this seems like good practice, so one of the proposed solutions is to simply emit a warning or exception if a key type with breaking semantics like this is provided. A few proposed solutions: - When the state store is defined on array keys, ensure that the cache map does proper comparisons on array values not array references. This would fix this problem, but seems a bit strange to special case. However, I have a hard time of thinking of other examples where this behavior would burn users. - Change the LRU cache to deserialize and serialize all keys to bytes and use a value based comparison for the map. This would be the most correct, as it would ensure that both the rocks db and the cache have identical key spaces and equality/hashing semantics. However, this is probably slow, and since the general case of using avro record types as keys works fine, it will largely be unnecessary overhead. - Don't change anything about the behavior, but trigger a warning in the log or fail to start if a state store is defined on array keys (or possibly any key type that fails to properly override Object.equals/hashcode.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4043) User-defined handler for topology restart
[ https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443912#comment-15443912 ] Greg Fodor commented on KAFKA-4043: --- I suppose it looks like there's also not a handler for when the topology shuts down normally either. This would be good to have too. > User-defined handler for topology restart > - > > Key: KAFKA-4043 > URL: https://issues.apache.org/jira/browse/KAFKA-4043 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Since Kafka Streams is just a library, there's a lot of cool stuff we've been > able to do that would be trickier if it were part of a larger > cluster-oriented job execution system that had assumptions about the > semantics of a job. One of the jobs we have uses Kafka Streams to do top > level data flow, and then one of our processors actually will kick off > background threads to do work based upon the data flow state. Happy to fill > in more details of our use-case, but fundamentally the model is that we have > a Kafka Streams data flow that is reading state from upstream, and that state > dictates that work needs to be done, which results in a dedicated work thread > to be spawned by our job. > This works great, but we're running into an issue when there is partition > reassignment, since we have no way to detect this and cleanly shut down these > threads. In our case, we'd like to shut down the background worker threads if > there is a partition rebalance or if the job raises an exception and attempts > to restart. In practice what is happening is we are getting duplicate threads > for the same work on a partition rebalance. > Implementation-wise, this seems like some type of event handler that can be > attached to the topology at build time that can will be called when the data > flow needs to rebalance or rebuild its task threads in general (ideally > passing as much information about the reason along.) I could imagine this > being factored similarly to the KafkaStreams#setUncaughtExceptionHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433738#comment-15433738 ] Greg Fodor commented on KAFKA-3758: --- Excited to try this, as our state stores have grown, it's become more and more difficult to get jobs to start up without timeouts. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731
[jira] [Created] (KAFKA-4043) User-defined handler for topology restart
Greg Fodor created KAFKA-4043: - Summary: User-defined handler for topology restart Key: KAFKA-4043 URL: https://issues.apache.org/jira/browse/KAFKA-4043 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Greg Fodor Assignee: Guozhang Wang Since Kafka Streams is just a library, there's a lot of cool stuff we've been able to do that would be trickier if it were part of a larger cluster-oriented job execution system that had assumptions about the semantics of a job. One of the jobs we have uses Kafka Streams to do top level data flow, and then one of our processors actually will kick off background threads to do work based upon the data flow state. Happy to fill in more details of our use-case, but fundamentally the model is that we have a Kafka Streams data flow that is reading state from upstream, and that state dictates that work needs to be done, which results in a dedicated work thread to be spawned by our job. This works great, but we're running into an issue when there is partition reassignment, since we have no way to detect this and cleanly shut down these threads. In our case, we'd like to shut down the background worker threads if there is a partition rebalance or if the job raises an exception and attempts to restart. In practice what is happening is we are getting duplicate threads for the same work on a partition rebalance. Implementation-wise, this seems like some type of event handler that can be attached to the topology at build time that can will be called when the data flow needs to rebalance or rebuild its task threads in general (ideally passing as much information about the reason along.) I could imagine this being factored similarly to the KafkaStreams#setUncaughtExceptionHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15416351#comment-15416351 ] Greg Fodor commented on KAFKA-3752: --- Oh apologies for mis-reading the ticket, but in our case it's a recoverable condition. > Provide a way for KStreams to recover from unclean shutdown > --- > > Key: KAFKA-3752 > URL: https://issues.apache.org/jira/browse/KAFKA-3752 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Roger Hoover > Labels: architecture > > If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM > Killer), it may leave behind lock files and fail to recover. > It would be useful to have an options (say --force) to tell KStreams to > proceed even if it finds old LOCK files. > {noformat} > [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in > thread [StreamThread-1]: > (org.apache.kafka.streams.processor.internals.StreamThread:583) > org.apache.kafka.streams.errors.ProcessorStateException: Error while creating > the state manager > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at >
[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415664#comment-15415664 ] Greg Fodor commented on KAFKA-3752: --- Hey [~guozhang], we're hitting this issue as well during a rebalance and I think also during failures during startup due to KAFKA-3559. The job eventually recovers. We are set to running 32 threads per instance and have 2 instances (though the issue happens more often when we run on a single instance.) Log here: https://gist.github.com/gfodor/bac65bff38233193b70836b78c701e7b > Provide a way for KStreams to recover from unclean shutdown > --- > > Key: KAFKA-3752 > URL: https://issues.apache.org/jira/browse/KAFKA-3752 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Roger Hoover > Labels: architecture > > If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM > Killer), it may leave behind lock files and fail to recover. > It would be useful to have an options (say --force) to tell KStreams to > proceed even if it finds old LOCK files. > {noformat} > [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in > thread [StreamThread-1]: > (org.apache.kafka.streams.processor.internals.StreamThread:583) > org.apache.kafka.streams.errors.ProcessorStateException: Error while creating > the state manager > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at >
[jira] [Resolved] (KAFKA-3770) KStream job should be able to specify linger.ms
[ https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor resolved KAFKA-3770. --- Resolution: Fixed Fixed via KAFKA-3786 > KStream job should be able to specify linger.ms > --- > > Key: KAFKA-3770 > URL: https://issues.apache.org/jira/browse/KAFKA-3770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: Greg Fodor > > The default linger.ms hardcoded into the StreamsConfig class of 100ms is > problematic for jobs that have lots of tasks, since this latency can accrue. > It seems useful to be able to override the linger.ms in the StreamsConfig. > Attached is a PR which allows this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326778#comment-15326778 ] Greg Fodor commented on KAFKA-3769: --- Discussion/resolution moved to: https://issues.apache.org/jira/browse/KAFKA-3811 > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3811: -- Attachment: screenshot-latency.png > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, > screenshot-latency.png > > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324869#comment-15324869 ] Greg Fodor commented on KAFKA-3811: --- Also, I've attached a screenshot + snapshot of a second run where I started sending data deeper in the pipeline which started to cause the latency metrics to take up a few % of time since we're using state stores. To me I guess a lot of this looks like lock contention mostly. > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, > screenshot-latency.png > > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3811: -- Attachment: Muon-latency.zip > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png > > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324847#comment-15324847 ] Greg Fodor commented on KAFKA-3811: --- I've also attached a screenshot of YourKit of the relevant call stacks > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > Attachments: Muon-Snapshot.zip, screenshot-1.png > > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3811: -- Attachment: screenshot-1.png > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > Attachments: Muon-Snapshot.zip, screenshot-1.png > > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3811: -- Attachment: Muon-Snapshot.zip > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > Attachments: Muon-Snapshot.zip > > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324838#comment-15324838 ] Greg Fodor commented on KAFKA-3811: --- Hey [~aartigupta], I ran an attached yourkit profiler to one of our jobs running dark against production data. The job has 200-300 topic-partition pairs and generally discards most messages early in the pipeline, and was processing a few thousand tps from the top level topics. Unfortunately since this issue came up we implemented changes to reduce the amount of data running through the system (discarding it earlier) so we didn't have to worry about this performance problem. In my tests a majority of the CPU time of the job was spent inside of the code walking and emitting to the Sensors for the per-message process metrics and the per-k/v read/write latency metrics. I also found 6-7% of the time was spent in the fetcher metrics which was addressed here: https://github.com/apache/kafka/pull/1464. Good news: I managed to find the snapshot data :) I will attach it here. The majority of the time is *not* the milliseconds() call but the actual (synchronized?) walk of Sensors in Sensor.record. > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: aarti gupta > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3811) Introduce KStream metrics recording levels
Greg Fodor created KAFKA-3811: - Summary: Introduce KStream metrics recording levels Key: KAFKA-3811 URL: https://issues.apache.org/jira/browse/KAFKA-3811 Project: Kafka Issue Type: Improvement Components: streams Reporter: Greg Fodor Assignee: Guozhang Wang Follow-up from the discussions here: https://github.com/apache/kafka/pull/1447 https://issues.apache.org/jira/browse/KAFKA-3769 The proposal is to introduce configuration to control the granularity/volumes of metrics emitted by Kafka Streams jobs, since the per-record level metrics introduce non-trivial overhead and are possibly less useful once a job has been optimized. Proposal from guozhangwang: level0 (stream thread global): per-record process / punctuate latency, commit latency, poll latency, etc level1 (per processor node, and per state store): IO latency, per-record .. latency, forward throughput, etc. And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels
[ https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3811: -- Summary: Introduce Kafka Streams metrics recording levels (was: Introduce KStream metrics recording levels) > Introduce Kafka Streams metrics recording levels > > > Key: KAFKA-3811 > URL: https://issues.apache.org/jira/browse/KAFKA-3811 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Follow-up from the discussions here: > https://github.com/apache/kafka/pull/1447 > https://issues.apache.org/jira/browse/KAFKA-3769 > The proposal is to introduce configuration to control the granularity/volumes > of metrics emitted by Kafka Streams jobs, since the per-record level metrics > introduce non-trivial overhead and are possibly less useful once a job has > been optimized. > Proposal from guozhangwang: > level0 (stream thread global): per-record process / punctuate latency, commit > latency, poll latency, etc > level1 (per processor node, and per state store): IO latency, per-record .. > latency, forward throughput, etc. > And by default we only turn on level0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording
[ https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313461#comment-15313461 ] Greg Fodor commented on KAFKA-3785: --- https://github.com/apache/kafka/pull/1464 > Fetcher spending unnecessary time during metrics recording > -- > > Key: KAFKA-3785 > URL: https://issues.apache.org/jira/browse/KAFKA-3785 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Greg Fodor > > Profiling a Kafka Streams job revealed some hotspots in the Fetcher during > metrics flushing. Previous discussion here: > https://issues.apache.org/jira/browse/KAFKA-3769 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording
[ https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3785: -- Component/s: consumer > Fetcher spending unnecessary time during metrics recording > -- > > Key: KAFKA-3785 > URL: https://issues.apache.org/jira/browse/KAFKA-3785 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Greg Fodor > > Profiling a Kafka Streams job revealed some hotspots in the Fetcher during > metrics flushing. Previous discussion here: > https://issues.apache.org/jira/browse/KAFKA-3769 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording
Greg Fodor created KAFKA-3785: - Summary: Fetcher spending unnecessary time during metrics recording Key: KAFKA-3785 URL: https://issues.apache.org/jira/browse/KAFKA-3785 Project: Kafka Issue Type: Improvement Reporter: Greg Fodor Profiling a Kafka Streams job revealed some hotspots in the Fetcher during metrics flushing. Previous discussion here: https://issues.apache.org/jira/browse/KAFKA-3769 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms
[ https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313434#comment-15313434 ] Greg Fodor commented on KAFKA-3770: --- Cut a new PR since I screwed up the rebase on the previous one. This one adds a test, etc. https://github.com/apache/kafka/pull/1463 > KStream job should be able to specify linger.ms > --- > > Key: KAFKA-3770 > URL: https://issues.apache.org/jira/browse/KAFKA-3770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: Guozhang Wang > > The default linger.ms hardcoded into the StreamsConfig class of 100ms is > problematic for jobs that have lots of tasks, since this latency can accrue. > It seems useful to be able to override the linger.ms in the StreamsConfig. > Attached is a PR which allows this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312913#comment-15312913 ] Greg Fodor commented on KAFKA-3758: --- also, if we did not run at an elevated number of threads, we were hitting that issue due to the timeout happening before all tasks had initialized. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312912#comment-15312912 ] Greg Fodor commented on KAFKA-3758: --- Hey, we're running 16 threads -- for this job we have 25 topics, approx ~350 topic-partitions involved, but for most of the job there isn't much I/O against most of these. Basically we are taking in a very small % of the incoming data at the top of the job and processing it, and discarding most of it early. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at >
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311179#comment-15311179 ] Greg Fodor commented on KAFKA-3769: --- Thanks Jay! Guozhang, what are your thoughts on instead of trying to reduce the granularity of the metrics, potentially having a way to just disable the process/latency metrics collection? I'm still pretty new to KStreams, and haven't used these metrics, but I'm guessing they will be used for occasionally tuning the job against production data but not necessarily for operational monitoring. (I could be wrong about this.) As such, it seems that you may want to just have a switch you flip when you are running in production that will disable the metrics and maximize the throughput of the job, and then turn it on selectively when you want to perform performance measurement. > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308306#comment-15308306 ] Greg Fodor commented on KAFKA-3758: --- No, the kstream job was running across 2 servers, and the kafka cluster was a 3 node cluster running on separate machines. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at >
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306381#comment-15306381 ] Greg Fodor commented on KAFKA-3769: --- Consider the PR a first pass, please advise on how we may want to deal with the fact that for KStream jobs with lots of tasks, etc, the overhead of writing the various process/poll/latency metrics is immense. > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms
[ https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306371#comment-15306371 ] Greg Fodor commented on KAFKA-3770: --- https://github.com/apache/kafka/pull/1448 > KStream job should be able to specify linger.ms > --- > > Key: KAFKA-3770 > URL: https://issues.apache.org/jira/browse/KAFKA-3770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Greg Fodor >Assignee: Guozhang Wang > > The default linger.ms hardcoded into the StreamsConfig class of 100ms is > problematic for jobs that have lots of tasks, since this latency can accrue. > It seems useful to be able to override the linger.ms in the StreamsConfig. > Attached is a PR which allows this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3770) KStream job should be able to specify linger.ms
Greg Fodor created KAFKA-3770: - Summary: KStream job should be able to specify linger.ms Key: KAFKA-3770 URL: https://issues.apache.org/jira/browse/KAFKA-3770 Project: Kafka Issue Type: Improvement Components: streams Reporter: Greg Fodor Assignee: Guozhang Wang The default linger.ms hardcoded into the StreamsConfig class of 100ms is problematic for jobs that have lots of tasks, since this latency can accrue. It seems useful to be able to override the linger.ms in the StreamsConfig. Attached is a PR which allows this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306321#comment-15306321 ] Greg Fodor commented on KAFKA-3769: --- It seems it might be desirable to have a way to just flip off some or all of the metrics. > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306318#comment-15306318 ] Greg Fodor commented on KAFKA-3769: --- Actually, additionally it looks like the code path for fetching from RocksDB spends most of its time recording the latency metrics :( > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics
[ https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306306#comment-15306306 ] Greg Fodor commented on KAFKA-3769: --- https://github.com/apache/kafka/pull/1447 > KStream job spending 60% of time writing metrics > > > Key: KAFKA-3769 > URL: https://issues.apache.org/jira/browse/KAFKA-3769 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > I've been profiling a complex streams job, and found two major hotspots when > writing metrics, which take up about 60% of the CPU time of the job. (!) A PR > is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3769) KStream job spending 60% of time writing metrics
Greg Fodor created KAFKA-3769: - Summary: KStream job spending 60% of time writing metrics Key: KAFKA-3769 URL: https://issues.apache.org/jira/browse/KAFKA-3769 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang Priority: Critical I've been profiling a complex streams job, and found two major hotspots when writing metrics, which take up about 60% of the CPU time of the job. (!) A PR is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor reopened KAFKA-3758: --- > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > 1732 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305503#comment-15305503 ] Greg Fodor commented on KAFKA-3758: --- Oh, actually, I'm not so sure. This was not during an unclean shutdown, but during a broker rebalance. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at >
[jira] [Resolved] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor resolved KAFKA-3758. --- Resolution: Duplicate > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > 1732 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305501#comment-15305501 ] Greg Fodor commented on KAFKA-3758: --- Ah yes this looks like the same issue, thanks! > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at >
[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302542#comment-15302542 ] Greg Fodor commented on KAFKA-3758: --- Also, the log is truncated at the top to the point where we shut the broker off. If there's additional useful information in the log before that you think we could share, happy to attach. > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at >
[jira] [Updated] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3758: -- Attachment: muon.log.1.gz > KStream job fails to recover after Kafka broker stopped > --- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > 1732 at
[jira] [Created] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped
Greg Fodor created KAFKA-3758: - Summary: KStream job fails to recover after Kafka broker stopped Key: KAFKA-3758 URL: https://issues.apache.org/jira/browse/KAFKA-3758 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang We've been doing some testing of a fairly complex KStreams job and under load it seems the job fails to rebalance + recover if we shut down one of the kafka brokers. The test we were running had a 3-node kafka cluster where each topic had at least a replication factor of 2, and we terminated one of the nodes. Attached is the full log, the root exception seems to be contention on the lock on the state directory. The job continues to try to recover but throws errors relating to locks over and over. Restarting the job itself resolves the problem. 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager 1703 at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71) 1704 at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86) 1705 at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) 1706 at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) 1707 at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) 1708 at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) 1709 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) 1710 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) 1711 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) 1712 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 1713 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 1714 at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) 1715 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 1716 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 1717 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) 1718 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) 1719 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) 1720 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) 1721 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 1722 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 1723 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 1724 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) 1725 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 1726 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 1727 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 1728 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 1729 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 1730 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) 1731 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) 1732 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) 1733 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) 1734 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) 1735 at
[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297745#comment-15297745 ] Greg Fodor commented on KAFKA-3745: --- Actually this seems like a good enough solution, I forgot to consider that the stream still has the join key as it's key so I can grab it in a subsequent map. All things considered this seems like a small price to pay for this rare case vs introducing the parameter into the joiner interface, so I'm fine with closing this ticket. > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Priority: Minor > Labels: api, newbie > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297733#comment-15297733 ] Greg Fodor commented on KAFKA-3745: --- Yes, the join key needs to be added to the final joined record. I suppose like you mention a nicer approach may be to just to emit the joined record with a null in place of where the key will go, and then do a simple .map after the join to fill in the key, this seems better than what I am doing now. > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Priority: Minor > Labels: api, newbie > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297477#comment-15297477 ] Greg Fodor commented on KAFKA-3745: --- Yep, I admit this is definitely not the most common case. But when it happens, the key is basically lost, so the workaround results in passing additional state through the system which seems undesirable. > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Priority: Minor > Labels: api, newbie > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297250#comment-15297250 ] Greg Fodor commented on KAFKA-3745: --- sure. we are left joining a KTable against a KStream. The entry in the KTable may be null since it's an outer join, so it can't be relied upon to provide the key, and the join key for the KStream is derived data from one of the fields (in particular it's a parsed substring of one of the columns.) the workaround right now is we do that parsing in a previous step and then emit a record with the value. > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Priority: Minor > Labels: api, newbie > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3745) Consider adding join key to ValueJoiner interface
Greg Fodor created KAFKA-3745: - Summary: Consider adding join key to ValueJoiner interface Key: KAFKA-3745 URL: https://issues.apache.org/jira/browse/KAFKA-3745 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang Priority: Minor In working with Kafka Stream joining, it's sometimes the case that a join key is not actually present in the values of the joins themselves (if, for example, a previous transform generated an ephemeral join key.) In such cases, the actual key of the join is not available in the ValueJoiner implementation to be used to construct the final joined value. This can be worked around by explicitly threading the join key into the value if needed, but it seems like extending the interface to pass the join key along as well would be helpful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3619) State lock file handle leaks
[ https://issues.apache.org/jira/browse/KAFKA-3619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3619: -- Description: The .lock files in the state store directories do not seem to be having their file handles freed (despite the locks being freed), so on a complex job the number of file handles in use goes up rapidly as the locks are taken for the cleanup routine at the end of the thread run loop. Running lsof shows the number of open filehandles on the .lock file increasing rapidly over time. In a separate test project, I reproduced the issue and determined that in order for the filehandle to be relinquished the FileChannel instance must be properly closed. PR: https://github.com/apache/kafka/pull/1267 was: The .lock files in the state store directories do not seem to be having their file handles freed (despite the locks being freed), so on a complex job the number of file handles in use goes up rapidly as the locks are taken for the cleanup routine at the end of the thread run loop. Running lsof shows the number of open filehandles on the .lock file increasing rapidly over time. In a separate test project, I reproduced the issue and determined that in order for the filehandle to be relinquished the FileChannel instance must be properly closed. PR: https://github.com/apache/kafka/pull/1267 > State lock file handle leaks > > > Key: KAFKA-3619 > URL: https://issues.apache.org/jira/browse/KAFKA-3619 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Critical > > The .lock files in the state store directories do not seem to be having their > file handles freed (despite the locks being freed), so on a complex job the > number of file handles in use goes up rapidly as the locks are taken for the > cleanup routine at the end of the thread run loop. Running lsof shows the > number of open filehandles on the .lock file increasing rapidly over time. In > a separate test project, I reproduced the issue and determined that in order > for the filehandle to be relinquished the FileChannel instance must be > properly closed. > PR: > https://github.com/apache/kafka/pull/1267 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3619) State lock file handle leaks
Greg Fodor created KAFKA-3619: - Summary: State lock file handle leaks Key: KAFKA-3619 URL: https://issues.apache.org/jira/browse/KAFKA-3619 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang Priority: Critical The .lock files in the state store directories do not seem to be having their file handles freed (despite the locks being freed), so on a complex job the number of file handles in use goes up rapidly as the locks are taken for the cleanup routine at the end of the thread run loop. Running lsof shows the number of open filehandles on the .lock file increasing rapidly over time. In a separate test project, I reproduced the issue and determined that in order for the filehandle to be relinquished the FileChannel instance must be properly closed. PR: https://github.com/apache/kafka/pull/1267 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor resolved KAFKA-3544. --- Resolution: Not A Problem > Missing topics on startup > - > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15240723#comment-15240723 ] Greg Fodor commented on KAFKA-3544: --- Ah this makes sense. Fortunately thanks to the other responses you've given I've been able to refactor my job to use through() in this case, so it will end up not having this problem once the relevant updates are made to through(). I'll close the ticket. > Missing topics on startup > - > > Key: KAFKA-3544 > URL: https://issues.apache.org/jira/browse/KAFKA-3544 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > Labels: semantics > > When running a relatively complex job with multiple tasks and state stores, > on the first run I get errors due to some of the intermediate topics not > existing. Subsequent runs work OK. My assumption is streams may be creating > topics lazily, so if downstream tasks are initializing before their parents > have had a chance to create their necessary topics then the children will > attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512 ] Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:05 AM: Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming") .map((k, v) -> KeyValue.pair(v.getUserId(), v)) .to(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStreamroomOperationMessagesByUserId = builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream userBroadcastsMessagesByUserId = roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, UserBroadcastsMessage::new); {code} In this example roomOperationSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating another KStream off of that output for a join downstream. The topology is failing to build on the room_operation_message_incoming-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread
[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512 ] Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM: Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming") .map((k, v) -> KeyValue.pair(v.getUserId(), v)) .to(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStreamroomOperationMessagesByUserId = builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream userBroadcastsMessagesByUserId = roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, UserBroadcastsMessage::new); {code} In this example roomOperationSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating another KStream off of that output for a join downstream. The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown
[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512 ] Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM: Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming") .map((k, v) -> KeyValue.pair(v.getUserId(), v)) .to(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStreamroomOperationMessagesByUserId = builder .stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id"); KStream userBroadcastsMessagesByUserId = roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, UserBroadcastsMessage::new); {code} In this example roomOperationSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating another KStream off of that output for a join downstream. The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread
[jira] [Commented] (KAFKA-3544) Missing topics on startup
[ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236512#comment-15236512 ] Greg Fodor commented on KAFKA-3544: --- Not sure of the best way to share the topology. Here's the relevant part of the code: {code} builder .stream(Serdes.Long(), userSpaceBroadcastSerde, "positron-db-user_space_broadcasts") .map((id, broadcast) -> KeyValue.pair(broadcast.getUserId().toString(), broadcast)) .to(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id"); KTableuserSpaceBroadcastsByUserId = builder .stream(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id") .aggregateByKey(...); {code} In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating a KTable off of that output by tapping it and then aggregating. (Given our discussions on other tickets there may be a way to simplify this, but I wanted to capture it as-is for this report.) The topology is failing to build on the user_space_broadcasts-user_id topic: {code} Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659) [0/1952] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) 2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2] Exception in thread
[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values
[ https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236427#comment-15236427 ] Greg Fodor commented on KAFKA-3543: --- Just something akin to a flatTransform where the output of the transformer is assumed to have a value of some kind of enumerable. Probably would be pretty convoluted since it would impact a lot of method signatures. I guess at a higher level, I wonder if there's any way to make your points on stateless vs non-stateless operators requiring Serdes more intuitive. It might back out to just docs. > Allow a variant of transform() which can emit multiple values > - > > Key: KAFKA-3543 > URL: https://issues.apache.org/jira/browse/KAFKA-3543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Right now it seems that if you want to apply an arbitrary stateful > transformation to a stream, you either have to use a TransformerSupplier or > ProcessorSupplier sent to transform() or process(). The custom processor will > allow you to emit multiple new values, but the process() method currently > terminates that branch of the topology so you can't apply additional data > flow. transform() lets you continue the data flow, but forces you to emit a > single value for every input value. > (It actually doesn't quite force you to do this, since you can hold onto the > ProcessorContext and emit multiple, but that's probably not the ideal way to > do it :)) > It seems desirable to somehow allow a transformation that emits multiple > values per input value. I'm not sure of the best way to factor this inside of > the current TransformerSupplier/Transformer architecture in a way that is > clean and efficient -- currently I'm doing the workaround above of just > calling forward() myself on the context and actually emitting dummy values > which are filtered out downstream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams
[ https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236417#comment-15236417 ] Greg Fodor commented on KAFKA-3542: --- great. feel free to close. > Add "repartition (+ join)" operations to streams > > > Key: KAFKA-3542 > URL: https://issues.apache.org/jira/browse/KAFKA-3542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Minor > > A common operation in Kafka Streams seems to be to repartition the stream > onto a different column, usually for joining. The current way I've been doing > this: > - Perform a map on the stream to the same value with a new key (the key we're > going to join on, usually a foreign key) > - Sink the stream into a new topic > - Create a new stream sourcing that topic > - Perform the join > Note that without explicitly sinking the intermediate topic, the topology > will fail to build because of the assertion that both sides of a join are > connected to source nodes. When you perform a map, the link between the > source nodes and the tail node of the topology is broken (by setting the > source nodes to null) so you are forced to sink to use that output in a join. > It seems that this pattern could possibly be rolled into much simpler > operation(s). For example, the map could be changed into a "repartition" > method where you just return the new key. And the join itself could be > simplified by letting you specify a re-partition function on either side of > the join and create the intermediate topic implicitly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values
[ https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236375#comment-15236375 ] Greg Fodor commented on KAFKA-3543: --- That makes sense, thanks! It may be useful to provide a helper method here of some kind, but the idiom above seems reasonable. Happy to consider this issue closed either way. > Allow a variant of transform() which can emit multiple values > - > > Key: KAFKA-3543 > URL: https://issues.apache.org/jira/browse/KAFKA-3543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Right now it seems that if you want to apply an arbitrary stateful > transformation to a stream, you either have to use a TransformerSupplier or > ProcessorSupplier sent to transform() or process(). The custom processor will > allow you to emit multiple new values, but the process() method currently > terminates that branch of the topology so you can't apply additional data > flow. transform() lets you continue the data flow, but forces you to emit a > single value for every input value. > (It actually doesn't quite force you to do this, since you can hold onto the > ProcessorContext and emit multiple, but that's probably not the ideal way to > do it :)) > It seems desirable to somehow allow a transformation that emits multiple > values per input value. I'm not sure of the best way to factor this inside of > the current TransformerSupplier/Transformer architecture in a way that is > clean and efficient -- currently I'm doing the workaround above of just > calling forward() myself on the context and actually emitting dummy values > which are filtered out downstream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams
[ https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236374#comment-15236374 ] Greg Fodor commented on KAFKA-3542: --- Right, this map approach is what I am doing right now before all of my joins, though I didn't realize I could use through() to generate a joinable stream without sourcing it explicity from the new topic. I will see if some of my joins can be satisfied with the aggregator first approach. The thing that bothers me about the current map -> sink approach is that the map is not really DRY (I should just need to specify the selector to re-partition on) and the intermediate topic name should just be generated. I agree an implicit through() call could be useful in place of the assertion currently being made to determine if two streams are joinable. > Add "repartition (+ join)" operations to streams > > > Key: KAFKA-3542 > URL: https://issues.apache.org/jira/browse/KAFKA-3542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Minor > > A common operation in Kafka Streams seems to be to repartition the stream > onto a different column, usually for joining. The current way I've been doing > this: > - Perform a map on the stream to the same value with a new key (the key we're > going to join on, usually a foreign key) > - Sink the stream into a new topic > - Create a new stream sourcing that topic > - Perform the join > Note that without explicitly sinking the intermediate topic, the topology > will fail to build because of the assertion that both sides of a join are > connected to source nodes. When you perform a map, the link between the > source nodes and the tail node of the topology is broken (by setting the > source nodes to null) so you are forced to sink to use that output in a join. > It seems that this pattern could possibly be rolled into much simpler > operation(s). For example, the map could be changed into a "repartition" > method where you just return the new key. And the join itself could be > simplified by letting you specify a re-partition function on either side of > the join and create the intermediate topic implicitly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams
[ https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236343#comment-15236343 ] Greg Fodor commented on KAFKA-3542: --- Ah, I may understand what you're getting at here -- to do the operation I have in mind, you would first perform an aggregation to pivot the streams onto the proper keys (via the selector), and then join those streams. Is that correct? > Add "repartition (+ join)" operations to streams > > > Key: KAFKA-3542 > URL: https://issues.apache.org/jira/browse/KAFKA-3542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Minor > > A common operation in Kafka Streams seems to be to repartition the stream > onto a different column, usually for joining. The current way I've been doing > this: > - Perform a map on the stream to the same value with a new key (the key we're > going to join on, usually a foreign key) > - Sink the stream into a new topic > - Create a new stream sourcing that topic > - Perform the join > Note that without explicitly sinking the intermediate topic, the topology > will fail to build because of the assertion that both sides of a join are > connected to source nodes. When you perform a map, the link between the > source nodes and the tail node of the topology is broken (by setting the > source nodes to null) so you are forced to sink to use that output in a join. > It seems that this pattern could possibly be rolled into much simpler > operation(s). For example, the map could be changed into a "repartition" > method where you just return the new key. And the join itself could be > simplified by letting you specify a re-partition function on either side of > the join and create the intermediate topic implicitly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3543) Allow a variant of transform() which can emit multiple values
[ https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236339#comment-15236339 ] Greg Fodor commented on KAFKA-3543: --- Ah interesting, my assumption was that the items emitted from the transform need to be serializable. Is the pattern you describe above basically avoiding this because the flatMap unrolls the Iterables into serializable values? > Allow a variant of transform() which can emit multiple values > - > > Key: KAFKA-3543 > URL: https://issues.apache.org/jira/browse/KAFKA-3543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Right now it seems that if you want to apply an arbitrary stateful > transformation to a stream, you either have to use a TransformerSupplier or > ProcessorSupplier sent to transform() or process(). The custom processor will > allow you to emit multiple new values, but the process() method currently > terminates that branch of the topology so you can't apply additional data > flow. transform() lets you continue the data flow, but forces you to emit a > single value for every input value. > (It actually doesn't quite force you to do this, since you can hold onto the > ProcessorContext and emit multiple, but that's probably not the ideal way to > do it :)) > It seems desirable to somehow allow a transformation that emits multiple > values per input value. I'm not sure of the best way to factor this inside of > the current TransformerSupplier/Transformer architecture in a way that is > clean and efficient -- currently I'm doing the workaround above of just > calling forward() myself on the context and actually emitting dummy values > which are filtered out downstream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3542) Add "repartition (+ join)" operations to streams
[ https://issues.apache.org/jira/browse/KAFKA-3542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236331#comment-15236331 ] Greg Fodor commented on KAFKA-3542: --- I might just not be understanding, but a simple example is if I have a database table being streamed in from Kafka connect and I'd like to join it on another table based on a foreign key, my understanding is I need to ensure both streams are partitioned on the same key before they are fed into the join. The current join implementation doesn't seem to allow me to specify something to extract the proper key to join on, I believe it assumes both inputs are already pre-partitioned on the proper keys. > Add "repartition (+ join)" operations to streams > > > Key: KAFKA-3542 > URL: https://issues.apache.org/jira/browse/KAFKA-3542 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang >Priority: Minor > > A common operation in Kafka Streams seems to be to repartition the stream > onto a different column, usually for joining. The current way I've been doing > this: > - Perform a map on the stream to the same value with a new key (the key we're > going to join on, usually a foreign key) > - Sink the stream into a new topic > - Create a new stream sourcing that topic > - Perform the join > Note that without explicitly sinking the intermediate topic, the topology > will fail to build because of the assertion that both sides of a join are > connected to source nodes. When you perform a map, the link between the > source nodes and the tail node of the topology is broken (by setting the > source nodes to null) so you are forced to sink to use that output in a join. > It seems that this pattern could possibly be rolled into much simpler > operation(s). For example, the map could be changed into a "repartition" > method where you just return the new key. And the join itself could be > simplified by letting you specify a re-partition function on either side of > the join and create the intermediate topic implicitly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3545) Generalized Serdes for List/Map
Greg Fodor created KAFKA-3545: - Summary: Generalized Serdes for List/Map Key: KAFKA-3545 URL: https://issues.apache.org/jira/browse/KAFKA-3545 Project: Kafka Issue Type: Improvement Components: streams Reporter: Greg Fodor Assignee: Guozhang Wang Priority: Minor In working with Kafka Streams I've found it's often the case I want to perform a "group by" operation, where I repartition a stream based on a foreign key and then do an aggregation of all the values into a single collection, so the stream becomes one where each entry has a value that is a serialized list of values that belonged to the key. (This seems unrelated to the 'group by' operation talked about in KAFKA-3544.) Basically the same typical group by operation found in systems like Cascading. In order to create these intermediate list values I needed to define custom avro schemas that simply wrap the elements of interest into a list. It seems desirable that there be some basic facility for constructing simple Serdes of Lists/Maps/Sets of other types, potentially using avro's serialization under the hood. If this existed in the core library it would also enable the addition of higher level operations on streams that can use these Serdes to perform simple operations like the "group by" example I mention. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3544) Missing topics on startup
Greg Fodor created KAFKA-3544: - Summary: Missing topics on startup Key: KAFKA-3544 URL: https://issues.apache.org/jira/browse/KAFKA-3544 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang When running a relatively complex job with multiple tasks and state stores, on the first run I get errors due to some of the intermediate topics not existing. Subsequent runs work OK. My assumption is streams may be creating topics lazily, so if downstream tasks are initializing before their parents have had a chance to create their necessary topics then the children will attempt to start consuming from topics that do not exist yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3543) Allow a variant of transform() which can emit multiple values
[ https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236121#comment-15236121 ] Greg Fodor edited comment on KAFKA-3543 at 4/11/16 10:40 PM: - Also note that in my case flatMap() is not sufficient for my needs, since I need to able to use state stores. was (Author: gfodor): Also note that in my case the iflatMap() is not sufficient for my needs, since I need to able to use state stores. > Allow a variant of transform() which can emit multiple values > - > > Key: KAFKA-3543 > URL: https://issues.apache.org/jira/browse/KAFKA-3543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Right now it seems that if you want to apply an arbitrary transformation to a > stream, you either have to use a TransformerSupplier or ProcessorSupplier > sent to transform() or process(). The custom processor will allow you to emit > multiple new values, but the process() method currently terminates that > branch of the topology so you can't apply additional data flow. transform() > lets you continue the data flow, but forces you to emit a single value for > every input value. > (It actually doesn't quite force you to do this, since you can hold onto the > ProcessorContext and emit multiple, but that's probably not the ideal way to > do it :)) > It seems desirable to somehow allow a transformation that emits multiple > values per input value. I'm not sure of the best way to factor this inside of > the current TransformerSupplier/Transformer architecture in a way that is > clean and efficient -- currently I'm doing the workaround above of just > calling forward() myself on the context and actually emitting dummy values > which are filtered out downstream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3543) Allow a variant of transform() which allows emitting multiple values
Greg Fodor created KAFKA-3543: - Summary: Allow a variant of transform() which allows emitting multiple values Key: KAFKA-3543 URL: https://issues.apache.org/jira/browse/KAFKA-3543 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang Right now it seems that if you want to apply an arbitrary transformation to a stream, you either have to use a TransformerSupplier or ProcessorSupplier sent to transform() or process(). The custom processor will allow you to emit multiple new values, but the process() method currently terminates that branch of the topology so you can't apply additional data flow. transform() lets you continue the data flow, but forces you to emit a single value for every input value. (It actually doesn't quite force you to do this, since you can hold onto the ProcessorContext and emit multiple, but that's probably not the ideal way to do it :)) It seems desirable to somehow allow a transformation that emits multiple values per input value. I'm not sure of the best way to factor this inside of the current TransformerSupplier/Transformer architecture in a way that is clean and efficient -- currently I'm doing the workaround above of just calling forward() myself on the context and actually emitting dummy values which are filtered out downstream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3543) Allow a variant of transform() which can emit multiple values
[ https://issues.apache.org/jira/browse/KAFKA-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Fodor updated KAFKA-3543: -- Summary: Allow a variant of transform() which can emit multiple values (was: Allow a variant of transform() which allows emitting multiple values) > Allow a variant of transform() which can emit multiple values > - > > Key: KAFKA-3543 > URL: https://issues.apache.org/jira/browse/KAFKA-3543 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > Right now it seems that if you want to apply an arbitrary transformation to a > stream, you either have to use a TransformerSupplier or ProcessorSupplier > sent to transform() or process(). The custom processor will allow you to emit > multiple new values, but the process() method currently terminates that > branch of the topology so you can't apply additional data flow. transform() > lets you continue the data flow, but forces you to emit a single value for > every input value. > (It actually doesn't quite force you to do this, since you can hold onto the > ProcessorContext and emit multiple, but that's probably not the ideal way to > do it :)) > It seems desirable to somehow allow a transformation that emits multiple > values per input value. I'm not sure of the best way to factor this inside of > the current TransformerSupplier/Transformer architecture in a way that is > clean and efficient -- currently I'm doing the workaround above of just > calling forward() myself on the context and actually emitting dummy values > which are filtered out downstream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3542) Add "repartition (+ join)" operations to streams
Greg Fodor created KAFKA-3542: - Summary: Add "repartition (+ join)" operations to streams Key: KAFKA-3542 URL: https://issues.apache.org/jira/browse/KAFKA-3542 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.0 Reporter: Greg Fodor Assignee: Guozhang Wang Priority: Minor A common operation in Kafka Streams seems to be to repartition the stream onto a different column, usually for joining. The current way I've been doing this: - Perform a map on the stream to the same value with a new key (the key we're going to join on, usually a foreign key) - Sink the stream into a new topic - Create a new stream sourcing that topic - Perform the join Note that without explicitly sinking the intermediate topic, the topology will fail to build because of the assertion that both sides of a join are connected to source nodes. When you perform a map, the link between the source nodes and the tail node of the topology is broken (by setting the source nodes to null) so you are forced to sink to use that output in a join. It seems that this pattern could possibly be rolled into much simpler operation(s). For example, the map could be changed into a "repartition" method where you just return the new key. And the join itself could be simplified by letting you specify a re-partition function on either side of the join and create the intermediate topic implicitly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3360) Add a protocol page/section to the official Kafka documentation
[ https://issues.apache.org/jira/browse/KAFKA-3360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198422#comment-15198422 ] Greg Fodor commented on KAFKA-3360: --- Apologies as I did not really know the best place to put this, and wasn't sure of the proper way to correct things myself, but I wanted to report an error in the documentation at: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol The v1 FetchResponse protocol states that the ThrottleTime field is reported *after* the messages, but in fact the ThrottleTime precedes the message set. So the line: ``` [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime ``` Should instead read: ``` ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ``` The main Kafka protocol docs reflect this properly. > Add a protocol page/section to the official Kafka documentation > --- > > Key: KAFKA-3360 > URL: https://issues.apache.org/jira/browse/KAFKA-3360 > Project: Kafka > Issue Type: Improvement >Reporter: Grant Henke >Assignee: Grant Henke > Fix For: 0.10.0.0 > > > This is an umbrella jira to track adding a protocol page/section to the > official Kafka documentation. It lays out subtasks for initial content and > follow up improvements and fixes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3360) Add a protocol page/section to the official Kafka documentation
[ https://issues.apache.org/jira/browse/KAFKA-3360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198422#comment-15198422 ] Greg Fodor edited comment on KAFKA-3360 at 3/16/16 11:39 PM: - Apologies as I did not really know the best place to put this, and wasn't sure of the proper way to correct things myself, but I wanted to report an error in the documentation at: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol The v1 FetchResponse protocol states that the ThrottleTime field is reported *after* the messages, but in fact the ThrottleTime precedes the message set. So the line: {code} [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime {code} Should instead read: {code} ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] {code} The main Kafka protocol docs reflect this properly. was (Author: gfodor): Apologies as I did not really know the best place to put this, and wasn't sure of the proper way to correct things myself, but I wanted to report an error in the documentation at: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol The v1 FetchResponse protocol states that the ThrottleTime field is reported *after* the messages, but in fact the ThrottleTime precedes the message set. So the line: ``` [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime ``` Should instead read: ``` ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ``` The main Kafka protocol docs reflect this properly. > Add a protocol page/section to the official Kafka documentation > --- > > Key: KAFKA-3360 > URL: https://issues.apache.org/jira/browse/KAFKA-3360 > Project: Kafka > Issue Type: Improvement >Reporter: Grant Henke >Assignee: Grant Henke > Fix For: 0.10.0.0 > > > This is an umbrella jira to track adding a protocol page/section to the > official Kafka documentation. It lays out subtasks for initial content and > follow up improvements and fixes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)