If you want them to survive across jobs you can use snowflake IDs or similar ideas depending on your use case
On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, <mich.talebza...@gmail.com> wrote: > Meaning as a monolithically incrementing ID as in Oracle sequence for each > record read from Kafka. adding that to your dataframe? > > If you do Structured Structured Streaming in microbatch mode, you will get > what is known as BatchId > > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.ticker").alias("ticker") \ > , col("parsed_value.timeissued").alias("timeissued") \ > , col("parsed_value.price").alias("price")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > *foreachBatch(sendToSink). \* > trigger(processingTime='30 seconds'). \ > option('checkpointLocation', checkpoint_path). \ > queryName(config['MDVariables']['topic']). \ > > That function sendToSink will introduce two variables df and batchId > > def *sendToSink(df, batchId):* > if(len(df.take(1))) > 0: > print(f"""md batchId is {batchId}""") > df.show(100,False) > df. persist() > # write to BigQuery batch table > s.writeTableToBQ(df, "append", > config['MDVariables']['targetDataset'],config['MDVariables']['targetTable']) > df.unpersist() > print(f"""wrote to DB""") > else: > print("DataFrame md is empty") > > That value batchId can be used for each Batch. > > > Otherwise you can do this > > > startval = 1 > df = df.withColumn('id', monotonicallyIncreasingId + startval) > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose < > felixkizhakkelj...@gmail.com> wrote: > >> Hello, >> >> I am using Spark Structured Streaming to sink data from Kafka to AWS S3. >> I am wondering if its possible for me to introduce a uniquely incrementing >> identifier for each record as we do in RDBMS (incrementing long id)? >> This would greatly benefit to range prune while reading based on this ID. >> >> Any thoughts? I have looked at monotonically_incrementing_id but seems >> like its not deterministic and it wont ensure new records gets next id from >> the latest id what is already present in the storage (S3) >> >> Regards, >> Felix K Jose >> >