Theoretically, the composed value of batchId +
monotonically_increasing_id() would achieve the goal. The major downside is
that you'll need to deal with "deduplication" of output based on batchID
as monotonically_increasing_id() is indeterministic. You need to ensure
there's NO overlap on output ag
Sorry a correction regarding creating incrementing ID in Pyspark
>>> df = spark.range(1,5)
>>> from pyspark.sql.window import Window as W
>>> from pyspark.sql import functions as F
>>> df = df.withColumn("idx", F.monotonically_increasing_id())
>>> Wspec = W.orderBy("idx")
>>> df.withColumn("idx",
If you want them to survive across jobs you can use snowflake IDs or
similar ideas depending on your use case
On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh,
wrote:
> Meaning as a monolithically incrementing ID as in Oracle sequence for each
> record read from Kafka. adding that to your dataframe?
Meaning as a monolithically incrementing ID as in Oracle sequence for each
record read from Kafka. adding that to your dataframe?
If you do Structured Structured Streaming in microbatch mode, you will get
what is known as BatchId
result = streamingDataFrame.select( \
Hello,
I am using Spark Structured Streaming to sink data from Kafka to AWS S3. I
am wondering if its possible for me to introduce a uniquely incrementing
identifier for each record as we do in RDBMS (incrementing long id)?
This would greatly benefit to range prune while reading based on this ID.
Hi Experts,
When I`m reading spark code in version 3.0.0, when external shuffle
service is enabled:
ShuffleBlockFetcherIterator ->
fetchHostLocalBlocks ( there is some logic, when there is no record
in cache, then it need to use hostLocalDirManager.getHostLocalDirs to sen
Unsubscribe
Eric Wang 于2021年7月12日周一 上午7:31写道:
> Unsubscribe
>
> On Sun, Jul 11, 2021 at 9:59 PM Rishi Raj Tandon <
> tandon.rishi...@gmail.com> wrote:
>
>> Unsubscribe
>>
>