Yes, there's definitely an issue, can someone fix it? I'm not familiar with apache jira, do I need to make a bug report or what?
On Mon, Jan 29, 2024 at 2:57 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > OK > > This is the equivalent Python code > > from pyspark.sql import SparkSession > from pyspark.sql.functions import expr, when > from pyspark.sql.types import StructType, StructField, LongType > from datetime import datetime > > spark = SparkSession.builder \ > .master("local[*]") \ > .appName("StreamingSparkPartitioned") \ > .getOrCreate() > > expression = when(expr("value % 3 = 1"), "stupid_event") \ > .otherwise(when(expr("value % 3 = 2"), > "smart_event").otherwise("neutral_event")) > > # Define the schema to match the rate-micro-batch data source > schema = StructType([StructField("timestamp", LongType()), > StructField("value", LongType())]) > checkpoint_path = "file:///ssd/hduser/randomdata/chkpt" > > # Convert human-readable timestamp to Unix timestamp in milliseconds > start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000) > > streamingDF = spark.readStream \ > .format("rate-micro-batch") \ > .option("rowsPerBatch", "100") \ > .option("startTimestamp", start_timestamp) \ > .option("numPartitions", 1) \ > .load() \ > .withColumn("event_type", expression) > > query = ( > streamingDF.writeStream > .outputMode("append") > .format("console") > .trigger(processingTime="1 second") > .option("checkpointLocation", checkpoint_path) > .start() > ) > > query.awaitTermination() > > This is the error I am getting > File > "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line > 38, in <module> > query.awaitTermination() > File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", > line 201, in awaitTermination > File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File > "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", > line 175, in deco > pyspark.errors.exceptions.captured.StreamingQueryException: > [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId = > f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable > value for offset > Did not find value which can be converted into long > > Seems like there might be an issue with the *rate-micro-batch* source > when using the *startTimestamp* option. > > You can try using socket source for testing purposes > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 28 Jan 2024 at 22:00, Perfect Stranger <paulpaul1...@gmail.com> > wrote: > >> I described the issue here: >> >> >> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format >> >> Could someone please respond? >> >> The rate-micro-batch format doesn't seem to respect the startTimestamp >> option. >> >> Thanks. >> >