Hi!
As abstract code what I do in my streaming program is:
readStream() //from Kafka
.flatMap(readIngestionDatasetViaREST) //can return thousands of records
for a single event
.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()
I don't use triggers but I limit the number of events per trigger in the
Kafka reader.
What do you mean with process rate below batch duration? The process
rate is records per sec. (in my current deployment it's approx. 1),
batch duration is sec. (at around 60 sec.)
Best,
Rico
Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:
Hi Ricco,
Just to clarify, your batch interval may have a variable number of
rows sent to Kafka topic for each event?
In your writeStream code
writeStream. \
outputMode('append'). \
option("truncate", "false"). \
foreachBatch(SendToBigQuery). \
trigger(processingTime='2 seconds'). \
start()
Have you defined trigger(processingTime)? That is equivalent to your
sliding interval.
In general, processingTime == bath interval (the event).
In Spark GUI, under Structured streaming, you have Input Rate, Process
Rate and Batch Duration. Your process Rate has to be below Batch
Duration. ForeachBatch will process all the data come in before moving
to the next batch. It is up to the designer to ensure that the
processing time is below the event so Spark can process it.
HTH
LinkedIn
/https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>/
*Disclaimer:* Use it at your own risk.Any and all responsibility for
any loss, damage or destruction of data or any other property which
may arise from relying on this email's technical content is explicitly
disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.
On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann
<i...@ricobergmann.de <mailto:i...@ricobergmann.de>> wrote:
Hi all!
I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).
For one single event the pipeline creates a few thousand records
(rows)
that have to be stored. And to write the data I use foreachBatch().
My question is now: Is it guaranteed by Spark that all output
records of
one event are always contained in a single batch or can the
records also
be split into multiple batches?
Best,
Rico.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>