Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Lian Jiang
Thanks guys!

I am using SSS to backfill the past 3 month data. I thought I can use SSS
for both history data and new data. I just realized that SSS is not
appropriate for backfilling since the watermark relies on receivedAt which
could be 3 month ago. I will use batch job for backfill and use SSS (with
watermark and spark-states) for the real time processing.

On Sun, Mar 10, 2019 at 2:40 PM Jungtaek Lim  wrote:

> The query makes state growing infinitely. Could you consider watermark
> apply to "receivedAt" to let unnecessary part of state cleared out? Other
> than watermark you could implement TTL based eviction via
> flatMapGroupsWithState, though you'll need to implement your custom
> "dropDuplicate".
>
> 2019년 3월 11일 (월) 오전 5:59, Georg Heiler 님이 작성:
>
>> Use https://github.com/chermenin/spark-states instead
>>
>> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan <
>> ar...@apache.org>:
>>
>>>
>>> Read the link carefully,
>>>
>>> This solution is available (*only*) in Databricks Runtime.
>>>
>>> You can enable RockDB-based state management by setting the following
>>> configuration in the SparkSession before starting the streaming query.
>>>
>>> spark.conf.set(
>>>   "spark.sql.streaming.stateStore.providerClass",
>>>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>>>
>>>
>>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:
>>>
 Hi,

 I have a very simple SSS pipeline which does:

 val query = df
   .dropDuplicates(Array("Id", "receivedAt"))
   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
   .writeStream
   .format("parquet")
   .partitionBy("availabilityDomain", timePartitionCol)
   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
   .option("path", "/data")
   .option("checkpointLocation", "/data_checkpoint")
   .start()

 After ingesting 2T records, the state under checkpoint folder on HDFS 
 (replicator factor 2) grows to 2T bytes.
 My cluster has only 2T bytes which means the cluster can barely handle 
 further data growth.

 Online spark documents 
 (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
 says using rocksdb help SSS job reduce JVM memory overhead. But I cannot 
 find any document how

 to setup rocksdb for SSS. Spark class CheckpointReader seems to only 
 handle HDFS.

 Any suggestions? Thanks!






Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Jungtaek Lim
The query makes state growing infinitely. Could you consider watermark
apply to "receivedAt" to let unnecessary part of state cleared out? Other
than watermark you could implement TTL based eviction via
flatMapGroupsWithState, though you'll need to implement your custom
"dropDuplicate".

2019년 3월 11일 (월) 오전 5:59, Georg Heiler 님이 작성:

> Use https://github.com/chermenin/spark-states instead
>
> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan <
> ar...@apache.org>:
>
>>
>> Read the link carefully,
>>
>> This solution is available (*only*) in Databricks Runtime.
>>
>> You can enable RockDB-based state management by setting the following
>> configuration in the SparkSession before starting the streaming query.
>>
>> spark.conf.set(
>>   "spark.sql.streaming.stateStore.providerClass",
>>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>>
>>
>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:
>>
>>> Hi,
>>>
>>> I have a very simple SSS pipeline which does:
>>>
>>> val query = df
>>>   .dropDuplicates(Array("Id", "receivedAt"))
>>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>>   .writeStream
>>>   .format("parquet")
>>>   .partitionBy("availabilityDomain", timePartitionCol)
>>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>>   .option("path", "/data")
>>>   .option("checkpointLocation", "/data_checkpoint")
>>>   .start()
>>>
>>> After ingesting 2T records, the state under checkpoint folder on HDFS 
>>> (replicator factor 2) grows to 2T bytes.
>>> My cluster has only 2T bytes which means the cluster can barely handle 
>>> further data growth.
>>>
>>> Online spark documents 
>>> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot 
>>> find any document how
>>>
>>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle 
>>> HDFS.
>>>
>>> Any suggestions? Thanks!
>>>
>>>
>>>
>>>


Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Georg Heiler
Use https://github.com/chermenin/spark-states instead

Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan :

>
> Read the link carefully,
>
> This solution is available (*only*) in Databricks Runtime.
>
> You can enable RockDB-based state management by setting the following
> configuration in the SparkSession before starting the streaming query.
>
> spark.conf.set(
>   "spark.sql.streaming.stateStore.providerClass",
>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>
>
> On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:
>
>> Hi,
>>
>> I have a very simple SSS pipeline which does:
>>
>> val query = df
>>   .dropDuplicates(Array("Id", "receivedAt"))
>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>   .writeStream
>>   .format("parquet")
>>   .partitionBy("availabilityDomain", timePartitionCol)
>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>   .option("path", "/data")
>>   .option("checkpointLocation", "/data_checkpoint")
>>   .start()
>>
>> After ingesting 2T records, the state under checkpoint folder on HDFS 
>> (replicator factor 2) grows to 2T bytes.
>> My cluster has only 2T bytes which means the cluster can barely handle 
>> further data growth.
>>
>> Online spark documents 
>> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot 
>> find any document how
>>
>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle 
>> HDFS.
>>
>> Any suggestions? Thanks!
>>
>>
>>
>>


Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Arun Mahadevan
Read the link carefully,

This solution is available (*only*) in Databricks Runtime.

You can enable RockDB-based state management by setting the following
configuration in the SparkSession before starting the streaming query.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")


On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:

> Hi,
>
> I have a very simple SSS pipeline which does:
>
> val query = df
>   .dropDuplicates(Array("Id", "receivedAt"))
>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>   .writeStream
>   .format("parquet")
>   .partitionBy("availabilityDomain", timePartitionCol)
>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>   .option("path", "/data")
>   .option("checkpointLocation", "/data_checkpoint")
>   .start()
>
> After ingesting 2T records, the state under checkpoint folder on HDFS 
> (replicator factor 2) grows to 2T bytes.
> My cluster has only 2T bytes which means the cluster can barely handle 
> further data growth.
>
> Online spark documents 
> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot find 
> any document how
>
> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle 
> HDFS.
>
> Any suggestions? Thanks!
>
>
>
>


use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Lian Jiang
Hi,

I have a very simple SSS pipeline which does:

val query = df
  .dropDuplicates(Array("Id", "receivedAt"))
  .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
  .writeStream
  .format("parquet")
  .partitionBy("availabilityDomain", timePartitionCol)
  .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
  .option("path", "/data")
  .option("checkpointLocation", "/data_checkpoint")
  .start()

After ingesting 2T records, the state under checkpoint folder on HDFS
(replicator factor 2) grows to 2T bytes.
My cluster has only 2T bytes which means the cluster can barely handle
further data growth.

Online spark documents
(https://docs.databricks.com/spark/latest/structured-streaming/production.html)
says using rocksdb help SSS job reduce JVM memory overhead. But I
cannot find any document how

to setup rocksdb for SSS. Spark class CheckpointReader seems to only
handle HDFS.

Any suggestions? Thanks!