BY the way as per streaming doc
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>,
one can monitor streaming status with
result = streamingDataFrame.select( \
writeStream. \
outputMode('append'). \
option("truncate", "false"). \
format('console'). \
start()
print(result.status)
print(result.recentProgress)
print(result.lastProgress)
Ok so they should tell us something.
When I run it where streaming data is displayed (on-premise) I see below
(format('console')
{'message': 'Initializing sources', 'isDataAvailable': False,
'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+------+-------------------+------+--------+-------+----------------------+
|rowkey |ticker|timeissued |price
|currency|op_type|op_time |
+------------------------------------+------+-------------------+------+--------+-------+----------------------+
|e4c02434-fa1f-4e8e-ad94-40c2782e9681|MRW |2021-03-01
15:11:44|293.75|GBP |1 |2021-03-01 15:12:16.49|
etc ..
On the other hand when I run it in Google Cloud cluster I see exactly the
same diagnostics BUT no data!
{'message': 'Initializing sources', 'isDataAvailable': False,
'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+
So the monitoring does not say anything.
What does the following signify?
print(result.status)
Thanks
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Sat, 27 Feb 2021 at 18:26, Mich Talebzadeh <[email protected]>
wrote:
> Hi,
>
> I have a Pyspark program that uses *Spark 3.0.1* to read Kafka topic and
> write it to Google BigQuery. This works fine on Premise and loops over
> micro-batch of data.
>
> def fetch_data(self):
> self.sc.setLogLevel("ERROR")
> #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
> "timeissued":"2021-02-23T08:42:23", "price":31.12}
> schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
> try:
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topic']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> #streamingDataFrame.printSchema()
>
> """
> "foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> foreachBatch(SendToBigQuery) expects 2 parameters, first:
> micro-batch as DataFrame or Dataset and second: unique id for each batch
> Using foreachBatch, we write each micro batch to storage
> defined in our custom logic. In this case, we store the output of our
> streaming application to Google BigQuery table.
> Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
> col("parsed_value.rowkey").alias("rowkey") \
> , col("parsed_value.ticker").alias("ticker") \
> , col("parsed_value.timeissued").alias("timeissued") \
> , col("parsed_value.price").alias("price")). \
> withColumn("currency",
> lit(config['MDVariables']['currency'])). \
> withColumn("op_type",
> lit(config['MDVariables']['op_type'])). \
> withColumn("op_time", current_timestamp()). \
> writeStream. \
> outputMode('append'). \
> option("truncate", "false"). \
> format('console'). \
> start()
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
> result.awaitTermination()
>
> With this output
>
>
> -------------------------------------------
>
> Batch: 0
>
> -------------------------------------------
>
> +------+------+----------+-----+--------+-------+-------+
>
> |rowkey|ticker|timeissued|price|currency|op_type|op_time|
>
> +------+------+----------+-----+--------+-------+-------+
>
> +------+------+----------+-----+--------+-------+-------+
>
>
> -------------------------------------------
>
> Batch: 1
>
> -------------------------------------------
>
>
> +------------------------------------+------+-------------------+------+--------+-------+-----------------------+
>
> |rowkey |ticker|timeissued |price
> |currency|op_type|op_time |
>
>
> +------------------------------------+------+-------------------+------+--------+-------+-----------------------+
>
> |35bc0378-a782-4183-999f-561a1dc162aa|MRW |2021-02-27
> 17:15:49|300.75|GBP |1 |2021-02-27 17:16:24.472|
>
> |39c55b09-7f50-43fe-a0a1-f88e5bdd51e1|ORCL |2021-02-27 17:15:49|23.75
> |GBP |1 |2021-02-27 17:16:24.472|
>
> |22dfaf4f-2335-4658-aa74-3c0e4f05cc46|MKS |2021-02-27 17:15:49|441.9
> |GBP |1 |2021-02-27 17:16:24.472|
>
>
> However, GCP offers Dataproc compute servers that use Spark 3.1.1.
>
>
> The same code is stuck in BatchId 0 and does not move on.
>
>
>
> Streaming DataFrame : True
>
> 21/02/27 18:01:09 WARN
> org.apache.spark.sql.streaming.StreamingQueryManager:
> spark.sql.adaptive.enabled is not supported in streaming
> DataFrames/Datasets and will be disabled.
>
> -------------------------------------------
>
> Batch: 0
>
> -------------------------------------------
>
> +------+------+----------+-----+--------+-------+-------+
>
> |rowkey|ticker|timeissued|price|currency|op_type|op_time|
>
> +------+------+----------+-----+--------+-------+-------+
>
> +------+------+----------+-----+--------+-------+-------+
>
> I am getting one additional warning line highlighted above. Does that
> signify anything. Also is there anything else I can do to debug it. FYI, I
> can see that the data is coming through Kafka topic output
>
> $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper ctpcluster-m:2181,
> ctpcluster-w-0:2181, ctpcluster-w-1:2181 --topic md
>
> "rowkey":"56e9ef90-5113-4731-9f6e-1f91d5849799","ticker":"MSFT",
> "timeissued":"2021-02-27T18:20:42", "price":27.02}
>
>
> In GCP we have zookeepers and Kafka brokers on containers but that should
> not matter?
>
>
> Thanks
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>