Hi -
I have implemented a custom state store named BFStore with a change logger
as follows:
class BFStoreChangeLogger[K, V](val storeName: String,
val context: ProcessorContext,
val partition: Int,
val serialization: StateSerdes[K, V]) {
private val topic =
ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName)
private val collector =
context.asInstanceOf[RecordCollector.Supplier].recordCollector
def this(storeName: String, context: ProcessorContext, serialization:
StateSerdes[K, V]) {
this(storeName, context, context.taskId.partition, serialization)
}
def logChange(key: K, value: V): Unit = {
if (collector != null) {
val keySerializer = serialization.keySerializer
val valueSerializer = serialization.valueSerializer
collector.send(this.topic, key, value, this.partition,
context.timestamp, keySerializer, valueSerializer) //**//
}
}
}
In my driver program I build the topology and start the streams as follows:
val builder: TopologyBuilder = new TopologyBuilder()
builder.addSource("Source", config.fromTopic)
.addProcessor("Process", () => new WeblogProcessor(), "Source")
.addStateStore(new BFStoreSupplier[String](LOG_COUNT_STATE_STORE,
stringSerde, true, changelogConfig), "Process")
.addSink("Sink", "weblog-count-topic", "Process")
val streams = new KafkaStreams(builder, streamingConfig)
streams.start()
When I run the program, immediately I get the following exception ..
Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to flush state store log-counts
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
*Caused by: java.lang.IllegalStateException: This should not happen as
timestamp() should only be called while a record is processed*
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
at
com.lightbend.fdp.sample.kstream.processor.BFStoreChangeLogger.logChange(BFStoreChangeLogger.scala:24)
at
com.lightbend.fdp.sample.kstream.processor.BFStore.flush(BFStore.scala:86)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
... 8 more
Not sure I understand the whole trace but looks like this may be related to
https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the class
BFStoreChangeLogger in the line I marked above with //**//.
Any help / workaround will be appreciated ..
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg