Re: Structured Streaming Questions

2017-06-28 Thread Tathagata Das
Answers inline.



On Wed, Jun 28, 2017 at 10:27 AM, Revin Chalil  wrote:

> I am using Structured Streaming with Spark 2.1 and have some basic
> questions.
>
>
>
> · Is there a way to automatically refresh the Hive Partitions
> when using Parquet Sink with Partition? My query looks like below
>
>
>
> *val *queryCount = windowedCount
>   .withColumn("hive_partition_persist_date",
> $"persist_date_window_start".cast("date"))
>   .writeStream.format("parquet")
>   .partitionBy("hive_partition_persist_date")
>   .option("path", StatsDestination)
>   .option("checkpointLocation", CheckPointLocationStats)
>   .trigger(*ProcessingTime*(WindowDurationStats))
>   .outputMode("append")
>   .start()
>
>
>
> I have an external Parquet table built on top of Destination Dir. Above
> query creates the Partition Dirs but the Hive partition metadata is not
> refreshed and I have to execute  ALTER TABLE …. RECOVER PARTITIONS,
> before querying the Hive table. With legacy Streaming, it was possible to
> use the spark.sql(hiveQL), where hiveQL can be any hive statements, config
> settings etc:. Would this kind of functionality be available in structured
> streaming?
>
>
>
> · The Query creates an additional dir “_spark_metadata
> ” under the Destination dir and this
> causes the select statements against the Parquet table fail as it is
> expecting only the parquet files under the destination location. Is there a
> config to avoid the creation of this dir?
>
> The _spark_metadata directory hold the metadata information (a
write-ahead-log) of which files in the directory are the correct complete
files generated by the streaming query. This is how we actually get exactly
once guarantees when writing to a file system. So this directory is very
important. In fact, if you want to run queries on this directory, Spark is
aware of this metadata and will read the correct files, and ignore
temporary/incomplete files that may have been generated by failed/duplicate
tasks. Querying that directory from Hive may lead to duplicate data as it
will read those temp/duplicate files as well.

If you want the data to be available to Hive, you could periodically run a
Spark job to read the files from the directory and write out to a hive
table.


>
>
> · Our use-case does not need to account for late-arriving records
> and so I have set the WaterMark as 0 seconds. Is this needed to flush out
> the data or is that a default setting or is this inappropriate?
>
> This should be fine.


>
>
> · In the “Append” mode, I have to push at least 3 batches to
> actually see the records written to the Destination, even with the
> watermark = 0 seconds setting. I understand that the statestore has to wait
> for watermark to output records but here watermark is 0 seconds. Curious to
> know what exactly is happening behind the scenes…
>
> Probably this is what is going on
1st batch - No estimate of max event-time to begin with, watermark value
not set
2nd batch - Watermark is set based on the max event-time seen in 1st batch.
Say it is W. Windows (i.e. [start,end]) for the earlier data received in
first batch is such that window.start < W < window.end. So the window is
still open and not finalized.
3rd batch - Watermark updated to W1 such that earliestWindow.end < W, and
therefore the corresponding W is finalized.

In general, in aggregation + watermark + append mode, you have to wait at
least (window duration + watermark gap) before the earliest window expires
and emits the finalized aggregates. This is visually shown here.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking


>
> · The “Trigger” duration and “Window” duration in our case are
> the same as we need to just get the count for every batch. Is a “Window”
> really needed in this scenario as I can logically get the batch count by
> just using count? I tried to just get the count from the batch and it said,
> aggregation cannot be done on streaming dataframe in the Append Mode.
>
count() in a streaming DataFrame is not a batch count but a count on all
the data in the stream. As this data arrives, this count is updated by the
streaming query, and depending on the output mode, the updated/finalized
values are written out by the sink.

In general, the whole design of Structured Streaming is that you should
specify your query as if you are doing on a table, and the execution model
(batches, or otherwise) should not factor in into the query semantics. For
example, when writing SQL queries in MySQL, we only care about the query
semantics, and dont care about how it is going to be executed. So the
concept of "batch" does not exist in the streaming DataFrame APIs, and
queries written in this API will be executable either in 

Structured Streaming Questions

2017-06-28 Thread Revin Chalil
I am using Structured Streaming with Spark 2.1 and have some basic questions.


* Is there a way to automatically refresh the Hive Partitions when 
using Parquet Sink with Partition? My query looks like below


val queryCount = windowedCount
  .withColumn("hive_partition_persist_date", 
$"persist_date_window_start".cast("date"))
  .writeStream.format("parquet")
  .partitionBy("hive_partition_persist_date")
  .option("path", StatsDestination)
  .option("checkpointLocation", CheckPointLocationStats)
  .trigger(ProcessingTime(WindowDurationStats))
  .outputMode("append")
  .start()


I have an external Parquet table built on top of Destination Dir. Above query 
creates the Partition Dirs but the Hive partition metadata is not refreshed and 
I have to execute  ALTER TABLE  RECOVER PARTITIONS, before querying the 
Hive table. With legacy Streaming, it was possible to use the 
spark.sql(hiveQL), where hiveQL can be any hive statements, config settings 
etc:. Would this kind of functionality be available in structured streaming?


* The Query creates an additional dir 
"_spark_metadata" under the Destination dir 
and this causes the select statements against the Parquet table fail as it is 
expecting only the parquet files under the destination location. Is there a 
config to avoid the creation of this dir?

[cid:image002.jpg@01D2EFF9.184FE4F0]



* Our use-case does not need to account for late-arriving records and 
so I have set the WaterMark as 0 seconds. Is this needed to flush out the data 
or is that a default setting or is this inappropriate?


* In the "Append" mode, I have to push at least 3 batches to actually 
see the records written to the Destination, even with the watermark = 0 seconds 
setting. I understand that the statestore has to wait for watermark to output 
records but here watermark is 0 seconds. Curious to know what exactly is 
happening behind the scenes...



* The "Trigger" duration and "Window" duration in our case are the same 
as we need to just get the count for every batch. Is a "Window" really needed 
in this scenario as I can logically get the batch count by just using count? I 
tried to just get the count from the batch and it said, aggregation cannot be 
done on streaming dataframe in the Append Mode.


* In our current code, we use the DataBricks' Spark-Redshift library to 
write output to Redshift. Would this library be available in Structured 
Streaming? Is there a way to do this using the "ForEach"?


* With Legacy streaming, we checkpoint the Kafka Offsets in ZooKeeper. 
Is using Structured Streaming's checkpointing resilient enough to handle all 
the failure-restart scenarios?



* When would the spark 2.2 available for use? I see that the 
programming guide still says 2.1.1.


Thanks,
Revin