Hi

his is with my streaming app kafka 10.1.0.

My flow looks something like below

source topic stream -> filter for null value ->map to make it keyed by id
->custom processor to mystore -> to another topic -> ktable

I am hitting the below type of exception in a custom processor class if I
try to access offset() or partition() or timestamp() from the
ProcessorContext in the process() method. I was hoping it would return the
partition and offset for the enclosing topic(in this case source topic)
where its consuming from or -1 based on the api docs.

Looks like only in certain cases it is accessible. is it getting lost in
transformation phases.

Same issue happens on if i try to access them in punctuate() method but
some where I saw that it might not work in punctuate(). Any reason for this
or any link describing this will be helpful


====================================================================

java.lang.IllegalStateException: This should not happen as offset() should
only be called while a record is processed
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]
=====================================================================


Regards
Sai

Reply via email to