[
https://issues.apache.org/jira/browse/KAFKA-7470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sam updated KAFKA-7470:
-----------------------
Description:
Spark has a useful API for accumulating data in a thread safe way
[https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.AccumulatorV2]
and comes with some out-of-box useful accumulators e.g. for Longs
[https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.LongAccumulator]
I usually use accumulators for wiring in debugging, profiling, monitoring and
diagnostics into Spark jobs. I usually fire off a Future before running a Spark
job to periodically print the stats (e.g. TPS, histograms, counts, timings, etc)
So far I cannot find anything that is similar for Kafka Streams. Does anything
exist? I imagine this is possible at least for each instance of a Kafka app,
but to make this work across several instances would require creating an
intermediate topic.
Of course we want to be able to call this accumulator in a similar way to the
Spark Accumulator while preserving guarantees. Example usage:
{code:java}
val countAccumulator: Accumulator[Long] = ...
Future {
every(1 minute) {
logger.info("Processed " + countAccumulator.value + " records")
}
}
stream.map(x => {
countAccumulator.add(1)
x
}){code}
was:
For many situations in Big Data it is preferable to work with a small buffer of
records at a go, rather than one record at a time.
The natural example is calling some external API that supports batching for
efficiency.
How can we do this in Kafka Streams? I cannot find anything in the API that
looks like what I want.
So far I have:
{{builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")}}
What I want is:
{{builder.stream[String, String]("my-input-topic") .batched(chunkSize =
2000).map(externalBatchedApiCall).to("my-output-topic")}}
In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In
Spark Structured Streaming we can do
{{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams
> Thread safe accumulator across all instances
> --------------------------------------------
>
> Key: KAFKA-7470
> URL: https://issues.apache.org/jira/browse/KAFKA-7470
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: sam
> Priority: Major
>
> Spark has a useful API for accumulating data in a thread safe way
> [https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.AccumulatorV2]
> and comes with some out-of-box useful accumulators e.g. for Longs
> [https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.LongAccumulator]
> I usually use accumulators for wiring in debugging, profiling, monitoring and
> diagnostics into Spark jobs. I usually fire off a Future before running a
> Spark job to periodically print the stats (e.g. TPS, histograms, counts,
> timings, etc)
> So far I cannot find anything that is similar for Kafka Streams. Does
> anything exist? I imagine this is possible at least for each instance of a
> Kafka app, but to make this work across several instances would require
> creating an intermediate topic.
>
> Of course we want to be able to call this accumulator in a similar way to the
> Spark Accumulator while preserving guarantees. Example usage:
> {code:java}
> val countAccumulator: Accumulator[Long] = ...
> Future {
> every(1 minute) {
> logger.info("Processed " + countAccumulator.value + " records")
> }
> }
> stream.map(x => {
> countAccumulator.add(1)
> x
> }){code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)