[ https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287372#comment-17287372 ]
fml2 edited comment on KAFKA-12328 at 2/19/21, 9:50 PM: -------------------------------------------------------- {quote}ie, the supplier must return a new object each time it's invoked {quote} Yes, that must be the error! I return the same instance in each supplier invokation. The docs does not express it clearly enough IMO that a NEW instance must be returned each time. Could you please elaborate on this? Why is it required that the supplier must return a new instance on each invokation? If I return the same instance which is thread safe (it gets all the thread specific informations from the context) then a single instance would suffice. The context should be a proxy object in this case. I.e. it would be a "static" (not chaning) value within the transformer/processor but would internally return the data specific to the particular task in each call. Similar to proxy objects in Spring. But, apparently, this is not how kafka is implemented now so that I have to do this myself, right? was (Author: fml2): {quote}ie, the supplier must return a new object each time it's invoked {quote} Yes, that must be the error! I return the same instance in each supplier invokation. The docs does not express it clearly enough IMO that a NEW instance must be returned each time. > Expose TaskId partition number > ------------------------------ > > Key: KAFKA-12328 > URL: https://issues.apache.org/jira/browse/KAFKA-12328 > Project: Kafka > Issue Type: Wish > Components: streams > Reporter: fml2 > Priority: Major > Labels: needs-kip > > This question was posted [on > stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over] > and got an answer but the solution is quite complicated hence this ticket. > > In my Kafka Streams application, I have a task that sets up a scheduled (by > the wall time) punctuator. The punctuator iterates over the entries of a > store and does something with them. Like this: > {code:java} > var store = context().getStateStore("MyStore"); > var iter = store.all(); > while (iter.hasNext()) { > var entry = iter.next(); > // ... do something with the entry > } > // Print a summary (now): N entries processed > // Print a summary (wish): N entries processed in partition P > {code} > Is it possible to find out which partition the punctuator operates on? The > java docs for {{ProcessorContext.partition()}} states that this method > returns {{-1}} within punctuators. > I've read [Kafka Streams: Punctuate vs > Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process] > and the answers there. I can understand that a task is, in general, not tied > to a particular partition. But an iterator should be tied IMO. > How can I find out the partition? > Or is my assumption that a particular instance of a store iterator is tied to > a partion wrong? > What I need it for: I'd like to include the partition number in some log > messages. For now, I have several nearly identical log messages stating that > the punctuator does this and that. In order to make those messages "unique" > I'd like to include the partition number into them. > Since I'm working with a single store here (which might be partitioned), I > assume that every single execution of the punctuator is bound to a single > partition of that store. > > It would be cool if there were a method {{iterator.partition}} (or similar) > to get this information. -- This message was sent by Atlassian Jira (v8.3.4#803005)