As I stated earlier on,, there are alternatives that you might explore
socket sources for testing purposes.

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when
from pyspark.sql.types import StructType, StructField, LongType

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("StreamingSparkPartitioned") \
    .getOrCreate()

expression = when(expr("value % 3 = 1"), "stupid_event") \
    .otherwise(when(expr("value % 3 = 2"), "smart_event") \
.otherwise("neutral_event"))

# Define the schema to match the socket source data
schema = StructType([StructField("value", LongType())])
checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"

# Start a socket source for testing
socket_streamingDF = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load() \
    .withColumn("value", expr("CAST(value AS LONG)")) \
    .withColumn("event_type", expression)

query = (
    socket_streamingDF.writeStream
    .outputMode("append")
    .format("console")
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpoint_path)
    .start()
)

query.awaitTermination()

In this example, it listens to a socket on localhost:9999 and expects a
single integer value per line. You can use tools like netcat to send data
to this socket for testing.

echo "1" | nc -lk 9999

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 29 Jan 2024 at 11:33, Perfect Stranger <paulpaul1...@gmail.com>
wrote:

> Yes, there's definitely an issue, can someone fix it? I'm not familiar
> with apache jira, do I need to make a  bug report or what?
>
> On Mon, Jan 29, 2024 at 2:57 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> OK
>>
>> This is the equivalent Python code
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import expr, when
>> from pyspark.sql.types import StructType, StructField, LongType
>> from datetime import datetime
>>
>> spark = SparkSession.builder \
>>     .master("local[*]") \
>>     .appName("StreamingSparkPartitioned") \
>>     .getOrCreate()
>>
>> expression = when(expr("value % 3 = 1"), "stupid_event") \
>>     .otherwise(when(expr("value % 3 = 2"),
>> "smart_event").otherwise("neutral_event"))
>>
>> # Define the schema to match the rate-micro-batch data source
>> schema = StructType([StructField("timestamp", LongType()),
>> StructField("value", LongType())])
>> checkpoint_path = "file:///ssd/hduser/randomdata/chkpt"
>>
>> # Convert human-readable timestamp to Unix timestamp in milliseconds
>> start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000)
>>
>> streamingDF = spark.readStream \
>>     .format("rate-micro-batch") \
>>     .option("rowsPerBatch", "100") \
>>     .option("startTimestamp", start_timestamp) \
>>     .option("numPartitions", 1) \
>>     .load() \
>>     .withColumn("event_type", expression)
>>
>> query = (
>>     streamingDF.writeStream
>>     .outputMode("append")
>>     .format("console")
>>     .trigger(processingTime="1 second")
>>     .option("checkpointLocation", checkpoint_path)
>>     .start()
>> )
>>
>> query.awaitTermination()
>>
>> This is the error I am getting
>>   File
>> "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line
>> 38, in <module>
>>     query.awaitTermination()
>>   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line
>> 201, in awaitTermination
>>   File
>> "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line
>> 1322, in __call__
>>   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
>> line 175, in deco
>> pyspark.errors.exceptions.captured.StreamingQueryException:
>> [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId =
>> f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable
>> value for offset
>> Did not find value which can be converted into long
>>
>> Seems like there might be an issue with the *rate-micro-batch* source
>> when using the *startTimestamp* option.
>>
>> You can try using socket source for testing purposes
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 28 Jan 2024 at 22:00, Perfect Stranger <paulpaul1...@gmail.com>
>> wrote:
>>
>>> I described the issue here:
>>>
>>>
>>> https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format
>>>
>>> Could someone please respond?
>>>
>>> The rate-micro-batch format doesn't seem to respect the startTimestamp
>>> option.
>>>
>>> Thanks.
>>>
>>

Reply via email to