[Spark SQL] [Bug] Adding `checkpoint()` causes "column [...] cannot be resolved" error

2023-11-05 Thread Robin Zimmerman
Hi all,

Wondering if anyone has run into this as I can't find any similar issues in
JIRA, mailing list archives, Stack Overflow, etc. I had a query that was
running successfully, but the query planning time was extremely long (4+
hours). To fix this I added `checkpoint()` calls earlier in the code to
truncate the query plan. This worked to improve the performance, but now I
am getting the error "A column or function parameter with name
`B`.`JOIN_KEY` cannot be resolved." Nothing else in the query changed
besides the `checkpoint()` calls. The only thing I can surmise is that this
is related to a very complex nested query plan where the same table is used
multiple times upstream. The general flow is something like this:

```py
df = spark.sql("...")
df = df.checkpoint()
df.createOrReplaceTempView("df")

df2 = spark.sql("SELECT  JOIN df ...")
df2.createOrReplaceTempView("df2")

# Error happens here: A column or function parameter with name
`a`.`join_key` cannot be resolved. Did you mean one of the following?
[`b`.`join_key`, `a`.`col1`, `b`.`col2`]
spark.sql(""'
SELECT *
FROM  (
SELECT
a.join_key,
a.col1,
b.col2
FROM df2 b
LEFT JOIN df a ON b.join_key = a.join_key
)
""")
```

In the actual code df and df2 are very complex multi-level nested views
built upon other views. If I checkpoint all of the dataframes in the query
right before I run it the error goes away. Unfortunately I have not been
able to put together a minimal reproducible example.

Any ideas?

Thanks,
Robin


checkpoint file deletion

2023-06-29 Thread Lingzhe Sun
Hi all,

I performed a stateful structure streaming job, and configured 
spark.cleaner.referenceTracking.cleanCheckpoints to true
spark.cleaner.periodicGC.interval to 1min
in the config. But the checkpoint files are not deleted and the number of them 
keeps growing. Did I miss something?



Lingzhe Sun  
Hirain Technology


Re: structured streaming- checkpoint metadata growing indefinetely

2022-05-04 Thread Wojciech Indyk
For posterity: the problem was FileStreamSourceLog class. I needed to
overwrite method shouldRetain, that by default returns true and its doc say:
Default implementation retains all log entries. Implementations should
override the method to change the behavior.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


sob., 30 kwi 2022 o 12:35 Wojciech Indyk 
napisał(a):

> Hi Gourav!
> I use stateless processing, no watermarking, no aggregations.
> I don't want any data loss, so changing checkpoint location is not an
> option to me.
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>
>
> pt., 29 kwi 2022 o 11:07 Gourav Sengupta 
> napisał(a):
>
>> Hi,
>>
>> this may not solve the problem, but have you tried to stop the job
>> gracefully, and then restart without much delay by pointing to a new
>> checkpoint location? The approach will have certain uncertainties for
>> scenarios where the source system can loose data, or we do not expect
>> duplicates to be committed, etc.
>>
>> It will be good to know what kind of processing you are doing as well.
>>
>>
>> Regards,
>> Gourav
>>
>> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk 
>> wrote:
>>
>>> Update for the scenario of deleting compact files: it recovers from the
>>> recent (not compacted) checkpoint file, but when it comes to compaction of
>>> checkpoint then it fails with missing recent compaction file. I use Spark
>>> 3.1.2
>>>
>>> --
>>> Kind regards/ Pozdrawiam,
>>> Wojciech Indyk
>>>
>>>
>>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
>>> napisał(a):
>>>
>>>> Hello!
>>>> I use spark struture streaming. I need to use s3 for storing checkpoint
>>>> metadata (I know, it's not optimal storage for checkpoint metadata).
>>>> Compaction interval is 10 (default) and I set
>>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>>>> few weeks then checkpointing time increased significantly (cause a few
>>>> minutes dalay on processing). I looked at checkpoint metadata structure.
>>>> There is one heavy path there: checkpoint/source/0. Single .compact file
>>>> weights 25GB. I looked into its content and it contains all entries since
>>>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>>>> already processed data from the compact file, namely:
>>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not
>>>> work. As I've seen in the code it's related to previous version of
>>>> streaming, isn't it?
>>>> "spark.sql.streaming.fileSource.log.deletion"=true and
>>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>>>> The compact file store full history even if all data were processed
>>>> (except for the most recent checkpoint), so I expect most of entries would
>>>> be deleted. Is there any parameter to remove entries from compact file or
>>>> remove compact file gracefully from time to time? Now I am testing scenario
>>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>>>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>>>> recovers correctly from recent checkpoint. It looks like possible
>>>> workaround of my problem, but this scenario with manual delete of
>>>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>>>
>>>> --
>>>> Kind regards/ Pozdrawiam,
>>>> Wojciech Indyk
>>>>
>>>


Re: structured streaming- checkpoint metadata growing indefinetely

2022-04-30 Thread Wojciech Indyk
Hi Gourav!
I use stateless processing, no watermarking, no aggregations.
I don't want any data loss, so changing checkpoint location is not an
option to me.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


pt., 29 kwi 2022 o 11:07 Gourav Sengupta 
napisał(a):

> Hi,
>
> this may not solve the problem, but have you tried to stop the job
> gracefully, and then restart without much delay by pointing to a new
> checkpoint location? The approach will have certain uncertainties for
> scenarios where the source system can loose data, or we do not expect
> duplicates to be committed, etc.
>
> It will be good to know what kind of processing you are doing as well.
>
>
> Regards,
> Gourav
>
> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk 
> wrote:
>
>> Update for the scenario of deleting compact files: it recovers from the
>> recent (not compacted) checkpoint file, but when it comes to compaction of
>> checkpoint then it fails with missing recent compaction file. I use Spark
>> 3.1.2
>>
>> --
>> Kind regards/ Pozdrawiam,
>> Wojciech Indyk
>>
>>
>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
>> napisał(a):
>>
>>> Hello!
>>> I use spark struture streaming. I need to use s3 for storing checkpoint
>>> metadata (I know, it's not optimal storage for checkpoint metadata).
>>> Compaction interval is 10 (default) and I set
>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>>> few weeks then checkpointing time increased significantly (cause a few
>>> minutes dalay on processing). I looked at checkpoint metadata structure.
>>> There is one heavy path there: checkpoint/source/0. Single .compact file
>>> weights 25GB. I looked into its content and it contains all entries since
>>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>>> already processed data from the compact file, namely:
>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work.
>>> As I've seen in the code it's related to previous version of streaming,
>>> isn't it?
>>> "spark.sql.streaming.fileSource.log.deletion"=true and
>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>>> The compact file store full history even if all data were processed
>>> (except for the most recent checkpoint), so I expect most of entries would
>>> be deleted. Is there any parameter to remove entries from compact file or
>>> remove compact file gracefully from time to time? Now I am testing scenario
>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>>> recovers correctly from recent checkpoint. It looks like possible
>>> workaround of my problem, but this scenario with manual delete of
>>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>>
>>> --
>>> Kind regards/ Pozdrawiam,
>>> Wojciech Indyk
>>>
>>


Re: structured streaming- checkpoint metadata growing indefinetely

2022-04-29 Thread Gourav Sengupta
Hi,

this may not solve the problem, but have you tried to stop the job
gracefully, and then restart without much delay by pointing to a new
checkpoint location? The approach will have certain uncertainties for
scenarios where the source system can loose data, or we do not expect
duplicates to be committed, etc.

It will be good to know what kind of processing you are doing as well.


Regards,
Gourav

On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk 
wrote:

> Update for the scenario of deleting compact files: it recovers from the
> recent (not compacted) checkpoint file, but when it comes to compaction of
> checkpoint then it fails with missing recent compaction file. I use Spark
> 3.1.2
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>
>
> pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
> napisał(a):
>
>> Hello!
>> I use spark struture streaming. I need to use s3 for storing checkpoint
>> metadata (I know, it's not optimal storage for checkpoint metadata).
>> Compaction interval is 10 (default) and I set
>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>> few weeks then checkpointing time increased significantly (cause a few
>> minutes dalay on processing). I looked at checkpoint metadata structure.
>> There is one heavy path there: checkpoint/source/0. Single .compact file
>> weights 25GB. I looked into its content and it contains all entries since
>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>> already processed data from the compact file, namely:
>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work.
>> As I've seen in the code it's related to previous version of streaming,
>> isn't it?
>> "spark.sql.streaming.fileSource.log.deletion"=true and
>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>> The compact file store full history even if all data were processed
>> (except for the most recent checkpoint), so I expect most of entries would
>> be deleted. Is there any parameter to remove entries from compact file or
>> remove compact file gracefully from time to time? Now I am testing scenario
>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>> recovers correctly from recent checkpoint. It looks like possible
>> workaround of my problem, but this scenario with manual delete of
>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>
>> --
>> Kind regards/ Pozdrawiam,
>> Wojciech Indyk
>>
>


Re: structured streaming- checkpoint metadata growing indefinetely

2022-04-29 Thread Wojciech Indyk
Update for the scenario of deleting compact files: it recovers from the
recent (not compacted) checkpoint file, but when it comes to compaction of
checkpoint then it fails with missing recent compaction file. I use Spark
3.1.2

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
napisał(a):

> Hello!
> I use spark struture streaming. I need to use s3 for storing checkpoint
> metadata (I know, it's not optimal storage for checkpoint metadata).
> Compaction interval is 10 (default) and I set
> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
> few weeks then checkpointing time increased significantly (cause a few
> minutes dalay on processing). I looked at checkpoint metadata structure.
> There is one heavy path there: checkpoint/source/0. Single .compact file
> weights 25GB. I looked into its content and it contains all entries since
> batch 0 (current batch is around 25000). I tried a few parameters to remove
> already processed data from the compact file, namely:
> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work.
> As I've seen in the code it's related to previous version of streaming,
> isn't it?
> "spark.sql.streaming.fileSource.log.deletion"=true and
> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
> The compact file store full history even if all data were processed
> (except for the most recent checkpoint), so I expect most of entries would
> be deleted. Is there any parameter to remove entries from compact file or
> remove compact file gracefully from time to time? Now I am testing scenario
> when I stop the job, delete most of checkpoint/source/0/* files, keeping
> just a few recent checkpoints (not compacted) and I rerun the job. The job
> recovers correctly from recent checkpoint. It looks like possible
> workaround of my problem, but this scenario with manual delete of
> checkpoint files looks ugly, so I would prefer something managed by Spark.
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>


structured streaming- checkpoint metadata growing indefinetely

2022-04-28 Thread Wojciech Indyk
Hello!
I use spark struture streaming. I need to use s3 for storing checkpoint
metadata (I know, it's not optimal storage for checkpoint metadata).
Compaction interval is 10 (default) and I set
"spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
few weeks then checkpointing time increased significantly (cause a few
minutes dalay on processing). I looked at checkpoint metadata structure.
There is one heavy path there: checkpoint/source/0. Single .compact file
weights 25GB. I looked into its content and it contains all entries since
batch 0 (current batch is around 25000). I tried a few parameters to remove
already processed data from the compact file, namely:
"spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. As
I've seen in the code it's related to previous version of streaming, isn't
it?
"spark.sql.streaming.fileSource.log.deletion"=true and
"spark.sql.streaming.fileSink.log.deletion"=true doesn't work
The compact file store full history even if all data were processed (except
for the most recent checkpoint), so I expect most of entries would be
deleted. Is there any parameter to remove entries from compact file or
remove compact file gracefully from time to time? Now I am testing scenario
when I stop the job, delete most of checkpoint/source/0/* files, keeping
just a few recent checkpoints (not compacted) and I rerun the job. The job
recovers correctly from recent checkpoint. It looks like possible
workaround of my problem, but this scenario with manual delete of
checkpoint files looks ugly, so I would prefer something managed by Spark.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


Re: [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint

2021-10-07 Thread Lalwani, Jayesh
  1.  Is your join and aggregation based on the same keys?

You might want to look at the execution plan. It is possible that without 
checkpointing, Spark puts join and aggregation into the same stage to eliminate 
shuffling. With a checkpoint, you might have forced Spark to introduce a 
shuffle. I am blind guessing here. You need to look at the execution plan to 
understand what Spark is doing internally

2) Are you certain that you are aggregating on the data that you get from 
checkpoint? Or are you aggregating on the data frame that you checkpointed? If 
it’s the latter, spark might be executing your read + join twice. Again, you 
might want to look at the execution plan

From: "Schneider, Felix Jan" 
Date: Thursday, October 7, 2021 at 7:56 AM
To: "user@spark.apache.org" 
Subject: [EXTERNAL] [Spark Core][Intermediate][How-to]: Measure the time for a 
checkpoint


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Spark-Users,

my name is Felix and I’m currently working on a master’s thesis on adaptive 
checkpointing in Spark at TU Berlin.
As part of this work, I experimented with a Spark Kubernetes cluster to measure 
the runtime differences of Spark applications with and without Reliable 
checkpoints of DataFrames.
Below, I first explain how the experiment is set up and the assumptions I want 
to verify.
Second, I describe how the experiment was conducted and what the outcome was.
In the end, I share my open questions.

Experiment setup:

  *   DataFrame A: 20GB tabular data (e.g. a list of orders)
  *   DataFrame B: 10GB tabular data (e.g. a list of order items)
  *   Spark application A: first performs an inner-join of DataFrame A and 
DataFrame B to get the joined DataFrame C and then aggregates the joined 
DataFrame C
  *   Spark application B: Does the same as Spark application A plus it 
checkpoints the joined DataFrame C before it aggregates it
  *   Driver configuration: 1 driver instance with 4 CPU cores and 4GB of memory
  *   Executors configuration: 10 executor instances with 4 CPU cores and 8GB 
of memory each

Assumptions I want to verify:

  1.  Spark application A will take less time than Spark application B because 
Spark application B needs time to write a checkpoint to reliable storage (HDFS)
  2.  The mean runtime difference of Spark application A and Spark application 
B will be the mean of the time it will take to write the checkpoint.

Experiment sequence:

  1.  5 executions of Spark application A
  2.  5 executions of Spark application B
  3.  The request of the total duration of each application from Spark's 
history server API
  4.  Reading the time it took to checkpoint a DataFrame from the driver log of 
execution, e.g.:

 *   21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took 10024 
ms.
 *   21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done checkpointing 
RDD 68 to hdfs://checkpoints/rdd-68, new parent is RDD 77

  1.  Calculating the mean of the total duration of Spark application A’s 
executions and Spark application B’s executions to account for the differences 
in runtime due to e.g. cluster utilization
  2.  Calculating the mean of the time it took to checkpoint in Spark 
application B’s executions for the same reason as in 5.


Experiment results:

  *   The mean runtime of Spark application A: 11.72 minutes
  *   The mean runtime of Spark application B: 27.38 minutes
  *   -> The first assumption can be verified
  *   The mean time it took to write a checkpoint in Spark application B: 3.41 
minutes
  *   -> The second assumption can not be verified because: (27.38 minutes - 
11.72 minutes) > 3.41 minutes

Questions:

  1.  Is this a valid approach to collect the time for a checkpoint or are 
there other possible ways to measure this?
  2.  How can the difference in the mean runtime of Spark application A and 
Spark application B be explained if it’s greater than the mean of the time it 
took to write a checkpoint in Spark application B?

Feel free to share your thoughts and suggestions on the questions.
I’m happy to discuss this topic.

Thanks and kind regards,
Felix


Re: [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint

2021-10-07 Thread Mich Talebzadeh
Hi Felix,

Is this one off or streaming data?

What Kubernetes cluster are you using?

What is the actual code for checkpointinf

Finally I’d this scale or PySpark?

HTH

On Thu, 7 Oct 2021 at 12:56, Schneider, Felix Jan <
felix.j.schnei...@campus.tu-berlin.de> wrote:

> Hi Spark-Users,
>
> my name is Felix and I’m currently working on a master’s thesis on
> adaptive checkpointing in Spark at TU Berlin.
> As part of this work, I experimented with a Spark Kubernetes cluster to
> measure the runtime differences of Spark applications with and without
> Reliable checkpoints of DataFrames.
> Below, I first explain how the experiment is set up and the assumptions I
> want to verify.
> Second, I describe how the experiment was conducted and what the outcome
> was.
> In the end, I share my open questions.
>
> *Experiment setup:*
>
>- *DataFrame* *A*: 20GB tabular data (e.g. a list of orders)
>- *DataFrame* *B*: 10GB tabular data (e.g. a list of order items)
>- *Spark application A*: first performs an inner-join of *DataFrame*
>*A *and *DataFrame* *B *to get *the joined DataFrame C* and then
>aggregates *the joined DataFrame C*
>- *Spark application B*: Does the same as *Spark application A* plus
>it checkpoints *the joined DataFrame C *before it aggregates it
>- *Driver configuration: *1 driver instance with 4 CPU cores and 4GB
>of memory
>- *Executors configuration:* 10 executor instances with 4 CPU cores
>and 8GB of memory each
>
>
> *Assumptions I want to verify:*
>
>1. *Spark application A *will take less time than* Spark application
>B *because *Spark application B* needs time to write a checkpoint to
>reliable storage (HDFS)
>2. The mean runtime difference of *Spark application A *and *Spark
>application B *will be the mean of the time it will take to write the
>checkpoint.
>
>
> *Experiment sequence:*
>
>1. 5 executions of *Spark application A*
>2. 5 executions of *Spark application B*
>3. The request of the total duration of each application from Spark's
>history server API
>4. Reading the time it took to checkpoint a DataFrame from the driver
>log of execution, e.g.:
>   1. 21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took
>   10024 ms.
>   2. 21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done
>   checkpointing RDD 68 to hdfs://checkpoints/rdd-68, new parent is
>   RDD 77
>5. Calculating the mean of the total duration of *Spark application A*’s
>executions and *Spark application B*’s executions to account for the
>differences in runtime due to e.g. cluster utilization
>6. Calculating the mean of the time it took to checkpoint in *Spark
>application B*’s executions for the same reason as in 5.
>
>
> *Experiment results:*
>
>- The mean runtime of *Spark application A*: *11.72 minutes*
>- The mean runtime of *Spark application B*: *27.38 minutes*
>- -> The first assumption can be verified
>- The mean time it took to write a checkpoint in *Spark application B*
>: *3.41 minutes*
>- -> The second assumption can not be verified because: (27.38 minutes
>- 11.72 minutes) > 3.41 minutes
>
>
> *Questions:*
>
>1. Is this a valid approach to collect the time for a checkpoint or
>are there other possible ways to measure this?
>2. How can the difference in the mean runtime of *Spark application A
>and Spark application B *be explained if it’s greater than the mean of
>the time it took to write a checkpoint in *Spark application B*?
>
>
> Feel free to share your thoughts and suggestions on the questions.
> I’m happy to discuss this topic.
>
> Thanks and kind regards,
> Felix
>
> --



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


[Spark Core][Intermediate][How-to]: Measure the time for a checkpoint

2021-10-07 Thread Schneider, Felix Jan
Hi Spark-Users,

my name is Felix and I’m currently working on a master’s thesis on adaptive 
checkpointing in Spark at TU Berlin.
As part of this work, I experimented with a Spark Kubernetes cluster to measure 
the runtime differences of Spark applications with and without Reliable 
checkpoints of DataFrames.
Below, I first explain how the experiment is set up and the assumptions I want 
to verify.
Second, I describe how the experiment was conducted and what the outcome was.
In the end, I share my open questions.

Experiment setup:

  *   DataFrame A: 20GB tabular data (e.g. a list of orders)
  *   DataFrame B: 10GB tabular data (e.g. a list of order items)
  *   Spark application A: first performs an inner-join of DataFrame A and 
DataFrame B to get the joined DataFrame C and then aggregates the joined 
DataFrame C
  *   Spark application B: Does the same as Spark application A plus it 
checkpoints the joined DataFrame C before it aggregates it
  *   Driver configuration: 1 driver instance with 4 CPU cores and 4GB of memory
  *   Executors configuration: 10 executor instances with 4 CPU cores and 8GB 
of memory each

Assumptions I want to verify:

  1.  Spark application A will take less time than Spark application B because 
Spark application B needs time to write a checkpoint to reliable storage (HDFS)
  2.  The mean runtime difference of Spark application A and Spark application 
B will be the mean of the time it will take to write the checkpoint.

Experiment sequence:

  1.  5 executions of Spark application A
  2.  5 executions of Spark application B
  3.  The request of the total duration of each application from Spark's 
history server API
  4.  Reading the time it took to checkpoint a DataFrame from the driver log of 
execution, e.g.:
 *   21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took 10024 
ms.
 *   21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done checkpointing 
RDD 68 to hdfs://checkpoints/rdd-68, new parent is RDD 77
  5.  Calculating the mean of the total duration of Spark application A’s 
executions and Spark application B’s executions to account for the differences 
in runtime due to e.g. cluster utilization
  6.  Calculating the mean of the time it took to checkpoint in Spark 
application B’s executions for the same reason as in 5.

Experiment results:

  *   The mean runtime of Spark application A: 11.72 minutes
  *   The mean runtime of Spark application B: 27.38 minutes
  *   -> The first assumption can be verified
  *   The mean time it took to write a checkpoint in Spark application B: 3.41 
minutes
  *   -> The second assumption can not be verified because: (27.38 minutes - 
11.72 minutes) > 3.41 minutes

Questions:

  1.  Is this a valid approach to collect the time for a checkpoint or are 
there other possible ways to measure this?
  2.  How can the difference in the mean runtime of Spark application A and 
Spark application B be explained if it’s greater than the mean of the time it 
took to write a checkpoint in Spark application B?

Feel free to share your thoughts and suggestions on the questions.
I’m happy to discuss this topic.

Thanks and kind regards,
Felix


Re: Structured Streaming Checkpoint Error

2020-12-03 Thread German Schiavon
Thanks Jungtaek!

It makes sense, we are currently changing to an HDFS-Compatible FS, I was
wondering how this change would impact the checkpoint, but after what you
said it is more clear now.



On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim 
wrote:

> In theory it would work, but works very inefficiently on checkpointing. If
> I understand correctly, it will write the content to the temp file on s3,
> and rename the file which actually gets the temp file from s3 and write the
> content of temp file to the final path on s3. Compared to checkpoint with
> HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
> implementation of checkpoint manager on S3.
>
> Also atomic rename is still not working for S3, as well as S3 doesn't
> support write with overwrite=false. That said, there's no barrier if
> concurrent streaming queries access to the same checkpoint and mess up.
> With checkpoint in HDFS, the rename is atomic and only one succeeds even in
> parallel and the other query lost writing to the checkpoint file simply
> fails. That's a caveat you may want to keep in mind.
>
> On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
> wrote:
>
>> Hello!
>>
>> @Gabor Somogyi   I wonder that now that s3 is 
>> *strongly
>> consistent* , would work fine.
>>
>>
>> Regards!
>>
>> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>>
>> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> Makes sense, thanks a lot!
>>>
>>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Structured Streaming is simply not working when checkpoint location is
>>>> on S3 due to it's read-after-write consistency.
>>>> Please choose an HDFS compliant filesystem and it will work like a
>>>> charm.
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>>> gschiavonsp...@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> I have an Structured Streaming Application that reads from kafka,
>>>>> performs some aggregations and writes in S3 in parquet format.
>>>>>
>>>>> Everything seems to work great except that from time to time I get a
>>>>> checkpoint error, at the beginning I thought it was a random error but it
>>>>> happened more than 3 times already in a few days
>>>>>
>>>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>>
>>>>>
>>>>> Does this happen to anyone else?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> *This is the full error :*
>>>>>
>>>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>>>
>>>>> java.io.FileNotFoundException: No such file or directory:
>>>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>>>
>>>>> at
>>>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>>>
>>>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>>>
>>>>> at
>>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>>>
>>>>> at
>>>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>>>> at scala.Option.getOrElse(Option.scala:189)
>>>>>
>>>>


Re: Structured Streaming Checkpoint Error

2020-12-02 Thread Jungtaek Lim
In theory it would work, but works very inefficiently on checkpointing. If
I understand correctly, it will write the content to the temp file on s3,
and rename the file which actually gets the temp file from s3 and write the
content of temp file to the final path on s3. Compared to checkpoint with
HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't
support write with overwrite=false. That said, there's no barrier if
concurrent streaming queries access to the same checkpoint and mess up.
With checkpoint in HDFS, the rename is atomic and only one succeeds even in
parallel and the other query lost writing to the checkpoint file simply
fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
wrote:

> Hello!
>
> @Gabor Somogyi   I wonder that now that s3 is 
> *strongly
> consistent* , would work fine.
>
>
> Regards!
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>
> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
> wrote:
>
>> Hi Gabor,
>>
>> Makes sense, thanks a lot!
>>
>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Structured Streaming is simply not working when checkpoint location is
>>> on S3 due to it's read-after-write consistency.
>>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>> gschiavonsp...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> I have an Structured Streaming Application that reads from kafka,
>>>> performs some aggregations and writes in S3 in parquet format.
>>>>
>>>> Everything seems to work great except that from time to time I get a
>>>> checkpoint error, at the beginning I thought it was a random error but it
>>>> happened more than 3 times already in a few days
>>>>
>>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>
>>>>
>>>> Does this happen to anyone else?
>>>>
>>>> Thanks in advance.
>>>>
>>>> *This is the full error :*
>>>>
>>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>>
>>>> java.io.FileNotFoundException: No such file or directory:
>>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>>
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>>
>>>> at
>>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>>
>>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>>
>>>> at
>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>>
>>>> at
>>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>>> at scala.Option.getOrElse(Option.scala:189)
>>>>
>>>


Re: Structured Streaming Checkpoint Error

2020-12-02 Thread German Schiavon
Hello!

@Gabor Somogyi   I wonder that now that s3
is *strongly
consistent* , would work fine.


Regards!
https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/

On Thu, 17 Sep 2020 at 11:55, German Schiavon 
wrote:

> Hi Gabor,
>
> Makes sense, thanks a lot!
>
> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
> wrote:
>
>> Hi,
>>
>> Structured Streaming is simply not working when checkpoint location is on
>> S3 due to it's read-after-write consistency.
>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>
>> BR,
>> G
>>
>>
>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
>> wrote:
>>
>>> Hi!
>>>
>>> I have an Structured Streaming Application that reads from kafka,
>>> performs some aggregations and writes in S3 in parquet format.
>>>
>>> Everything seems to work great except that from time to time I get a
>>> checkpoint error, at the beginning I thought it was a random error but it
>>> happened more than 3 times already in a few days
>>>
>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>
>>>
>>> Does this happen to anyone else?
>>>
>>> Thanks in advance.
>>>
>>> *This is the full error :*
>>>
>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>
>>> java.io.FileNotFoundException: No such file or directory:
>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>
>>> at
>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>
>>> at
>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>
>>> at
>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>
>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>
>>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>> at scala.Option.getOrElse(Option.scala:189)
>>>
>>


Re: how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1

2020-10-13 Thread lec ssmi
sorry, the mail title  is a little problematic. "How to disable or
replace .."

lec ssmi  于2020年10月14日周三 上午9:27写道:

> I have written a demo using spark3.0.0, and the location where the
> checkpoint file  is saved has been explicitly specified   like
>>
>> stream.option("checkpointLocation","file:///C:\\Users\\Administrator
>> \\Desktop\\test")
>
> But the app still throws an   exception about the HDFS file system.
> Is it not possible to specify the local file system as a checkpoint
> location now?
>


how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1

2020-10-13 Thread lec ssmi
I have written a demo using spark3.0.0, and the location where the
checkpoint file  is saved has been explicitly specified   like
>
> stream.option("checkpointLocation","file:///C:\\Users\\Administrator\\
> Desktop\\test")

But the app still throws an   exception about the HDFS file system.
Is it not possible to specify the local file system as a checkpoint
location now?


Re: Structured Streaming Checkpoint Error

2020-09-17 Thread German Schiavon
Hi Gabor,

Makes sense, thanks a lot!

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
wrote:

> Hi,
>
> Structured Streaming is simply not working when checkpoint location is on
> S3 due to it's read-after-write consistency.
> Please choose an HDFS compliant filesystem and it will work like a charm.
>
> BR,
> G
>
>
> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
> wrote:
>
>> Hi!
>>
>> I have an Structured Streaming Application that reads from kafka,
>> performs some aggregations and writes in S3 in parquet format.
>>
>> Everything seems to work great except that from time to time I get a
>> checkpoint error, at the beginning I thought it was a random error but it
>> happened more than 3 times already in a few days
>>
>> Caused by: java.io.FileNotFoundException: No such file or directory:
>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>
>>
>> Does this happen to anyone else?
>>
>> Thanks in advance.
>>
>> *This is the full error :*
>>
>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>
>> java.io.FileNotFoundException: No such file or directory:
>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>
>> at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>
>> at
>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>
>> at
>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>
>> at
>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>
>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>
>> at
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>
>> at
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>
>> at
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>
>> at
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>
>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>> at scala.Option.getOrElse(Option.scala:189)
>>
>


Re: Structured Streaming Checkpoint Error

2020-09-17 Thread Gabor Somogyi
Hi,

Structured Streaming is simply not working when checkpoint location is on
S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
wrote:

> Hi!
>
> I have an Structured Streaming Application that reads from kafka, performs
> some aggregations and writes in S3 in parquet format.
>
> Everything seems to work great except that from time to time I get a
> checkpoint error, at the beginning I thought it was a random error but it
> happened more than 3 times already in a few days
>
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
>
> Does this happen to anyone else?
>
> Thanks in advance.
>
> *This is the full error :*
>
> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>
> java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>
> at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>
> at
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
> at scala.Option.getOrElse(Option.scala:189)
>


Structured Streaming Checkpoint Error

2020-09-16 Thread German Schiavon
Hi!

I have an Structured Streaming Application that reads from kafka, performs
some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a
checkpoint error, at the beginning I thought it was a random error but it
happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else?

Thanks in advance.

*This is the full error :*

ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
14edaddf-25bb-4259-b7a2-6107907f962f, runId =
0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory:
s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at
org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at
org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at
org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)


Re: Appropriate checkpoint interval in a spark streaming application

2020-08-15 Thread Sheel Pancholi
Guys any inputs explaining the rationale on the below question will really
help. Requesting some expert opinion.

Regards,
Sheel

On Sat, 15 Aug, 2020, 1:47 PM Sheel Pancholi,  wrote:

> Hello,
>
> I am trying to figure an appropriate checkpoint interval for my spark
> streaming application. Its Spark Kafka integration based on Direct Streams.
>
> If my *micro batch interval is 2 mins*, and let's say *each microbatch
> takes only 15 secs to process* then shouldn't my checkpoint interval also
> be exactly 2 mins?
>
> Assuming my spark streaming application starts at t=0, following will be
> the state of my checkpoint:
>
> *Case 1: checkpoint interval is less than microbatch interval*
> If I keep my *checkpoint interval at say 1 minute *then:
> *t=1m: *no incomplete batches in this checkpoint
> *t=2m*: first microbatch is included as an incomplete microbatch in the
> checkpoint and microbatch execution then begins
> *t=3m*: no incomplete batches in the checkpoint as the first microbatch
> is finished processing in just 15 secs
> *t=4m*: second microbatch is included as an incomplete microbatch in the
> checkpoint and microbatch execution then begins
> *t=4m30s*: system breaks down; on restarting the streaming application
> finds the checkpoint at t=4 with the second microbatch as the incomplete
> microbatch and processes it again. But what's the point of reprocessing it
> again since the second microbatch's processing was completed at the=4m15s
>
> *Case 2: checkpoint interval is more than microbatch interval*
> If I keep my *checkpoint interval at say 4 minutes* then:
> *t=2m* first microbatch execution begins
> *t=4m* first checkpoint with second microbatch included as the only
> incomplete batch; second microbatch processing begins
>
> *Sub case 1 :* *system breaks down at t=2m30s :* the first microbatch
> execution was completed at the=2m15s but there is no checkpoint information
> about this microbatch since the first checkpoint will happen at t=4m.
> Consequently, when the streaming app restarts it will re-execute by
> fetching the offsets from Kafka.
>
> *Sub case 2 :* *system breaks down at t=5m :* The second microbatch was
> already completed in 15 secs i.e. t=4m15s which means at t=5 there should
> ideally be no incomplete batches. When I restart my application, the
> streaming application finds the second microbatch as incomplete from the
> checkpoint made at t=4m, and re-executes that microbatch.
>
>
> Is my understanding right? If yes, then isn't my checkpoint interval
> incorrectly set resulting in duplicate processing in both the cases above?
> If yes, then how do I choose an appropriate checkpoint interval?
>
> Regards,
> Sheel
>


Appropriate checkpoint interval in a spark streaming application

2020-08-15 Thread Sheel Pancholi
Hello,

I am trying to figure an appropriate checkpoint interval for my spark
streaming application. Its Spark Kafka integration based on Direct Streams.

If my *micro batch interval is 2 mins*, and let's say *each microbatch
takes only 15 secs to process* then shouldn't my checkpoint interval also
be exactly 2 mins?

Assuming my spark streaming application starts at t=0, following will be
the state of my checkpoint:

*Case 1: checkpoint interval is less than microbatch interval*
If I keep my *checkpoint interval at say 1 minute *then:
*t=1m: *no incomplete batches in this checkpoint
*t=2m*: first microbatch is included as an incomplete microbatch in the
checkpoint and microbatch execution then begins
*t=3m*: no incomplete batches in the checkpoint as the first microbatch is
finished processing in just 15 secs
*t=4m*: second microbatch is included as an incomplete microbatch in the
checkpoint and microbatch execution then begins
*t=4m30s*: system breaks down; on restarting the streaming application
finds the checkpoint at t=4 with the second microbatch as the incomplete
microbatch and processes it again. But what's the point of reprocessing it
again since the second microbatch's processing was completed at the=4m15s

*Case 2: checkpoint interval is more than microbatch interval*
If I keep my *checkpoint interval at say 4 minutes* then:
*t=2m* first microbatch execution begins
*t=4m* first checkpoint with second microbatch included as the only
incomplete batch; second microbatch processing begins

*Sub case 1 :* *system breaks down at t=2m30s :* the first microbatch
execution was completed at the=2m15s but there is no checkpoint information
about this microbatch since the first checkpoint will happen at t=4m.
Consequently, when the streaming app restarts it will re-execute by
fetching the offsets from Kafka.

*Sub case 2 :* *system breaks down at t=5m :* The second microbatch was
already completed in 15 secs i.e. t=4m15s which means at t=5 there should
ideally be no incomplete batches. When I restart my application, the
streaming application finds the second microbatch as incomplete from the
checkpoint made at t=4m, and re-executes that microbatch.


Is my understanding right? If yes, then isn't my checkpoint interval
incorrectly set resulting in duplicate processing in both the cases above?
If yes, then how do I choose an appropriate checkpoint interval?

Regards,
Sheel


[Apache Spark][Streaming Job][Checkpoint]Spark job failed on Checkpoint recovery with Batch not found error

2020-05-28 Thread taylorwu
Hi,

We have a Spark 2.4 job failed on Checkpoint recovery every few hours with
the following errors (from the Driver Log):

driver spark-kubernetes-driver ERROR 20:38:51 ERROR MicroBatchExecution:
Query impressionUpdate [id = 54614900-4145-4d60-8156-9746ffc13d1f, runId =
3637c2f3-49b6-40c2-b6d0-7edb28361c5d] terminated with error
java.lang.IllegalStateException: batch 946 doesn't exist
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:406)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

And the executor logs show this error:

 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

How should I fix this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark on k8s - can driver and executor have separate checkpoint location?

2020-05-16 Thread Ali Gouta
Hello,

I am wondering if you do so, then all your executor pods should run on the
same kubernetes worker node since you mount a single volume with a
ReadWriteOnce policy. By design this seems not to be good I assume. You may
need to have a kind of ReadWriteMany policy associated to the volume. Then
have pod anti-affinity to make sure they are not running on the same node.
You may achieve this by running an NFS fiesystem and then create a PV/PVC
that mounts to that shared file system. The persistentVolumeClaim defined
in your Yaml should call the PVC you created.

Best regards,
Ali Gouta.

On Sat, May 16, 2020 at 6:06 AM wzhan  wrote:

> Hi guys,
>
> I'm running spark applications on kubernetes. According to spark
> documentation
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
> Spark needs distributed file system to store its checkpoint data so that in
> case of failure, it can recover from checkpoint directory.
>
> My question is, can driver and executor have separate checkpoint location?
> I'm asking this because driver and executor might be deployed on different
> nodes. A shared checkpoint location will require ReadWriteMany access mode.
> Since I only have a storage class that supports ReadWriteOnce access mode
> I'm trying to find some workaround.
>
>
> In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
> checkpoint information" and when executor failed, "Tasks and receivers
> restarted by Spark automatically, no config needed".
>
> Given this I tried only config checkpoint location for driver pod. It
> immediately failed with below exception:
>
> 2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id
> =
> b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
> f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
> org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
> baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
> f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
> org.apache.spark.SparkException: Writing job aborted.
> at
>
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> ...
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost
> task
> 0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
> java.io.IOException: mkdir of
> file:/opt/window-data/baseline_windower/state/0/0 failed
>
>
> So I tried giving separate checkpoint location to driver and executor:
>
> In spark application helm chart I have a checkpointlocation configuration:
>
> spec:
>sparkConf:
>  "spark.sql.streaming.checkpointLocation":
> "file:///opt/checkpoint-data"
>
> I created two checkpoint pvc and mount the volume for driver and executor
> pod:
>
>   volumes:
> - name: checkpoint-driver-volume
>   persistentVolumeClaim:
> claimName: checkpoint-driver-pvc
> - name: checkpoint-executor-volume
>   persistentVolumeClaim:
> claimName: checkpoint-executor-pvc
>
> driver:
> volumeMounts:
>   - name: checkpoint-driver-volume
> mountPath: "/opt/checkpoint-data"
> ...
> executor:
> volumeMounts:
>   - name: checkpoint-executor-volume
> mountPath: "/opt/checkpoint-data"
>
> After deployment it seems to be working. I tried restarted driver pod and
> it
> did recover from checkpoint directory. But I'm not sure if this is actually
> supported by design.
>
> Thanks,
> wzhan
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


spark on k8s - can driver and executor have separate checkpoint location?

2020-05-15 Thread wzhan
Hi guys,

I'm running spark applications on kubernetes. According to spark
documentation
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Spark needs distributed file system to store its checkpoint data so that in
case of failure, it can recover from checkpoint directory. 

My question is, can driver and executor have separate checkpoint location?
I'm asking this because driver and executor might be deployed on different
nodes. A shared checkpoint location will require ReadWriteMany access mode.
Since I only have a storage class that supports ReadWriteOnce access mode
I'm trying to find some workaround.


In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
checkpoint information" and when executor failed, "Tasks and receivers
restarted by Spark automatically, no config needed".

Given this I tried only config checkpoint location for driver pod. It
immediately failed with below exception:

2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id =
b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
...
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task
0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
java.io.IOException: mkdir of
file:/opt/window-data/baseline_windower/state/0/0 failed


So I tried giving separate checkpoint location to driver and executor:

In spark application helm chart I have a checkpointlocation configuration:

spec:
   sparkConf:
 "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data"

I created two checkpoint pvc and mount the volume for driver and executor
pod:

  volumes:
- name: checkpoint-driver-volume
      persistentVolumeClaim:
claimName: checkpoint-driver-pvc
- name: checkpoint-executor-volume
      persistentVolumeClaim:
claimName: checkpoint-executor-pvc

driver:
volumeMounts:
  - name: checkpoint-driver-volume
mountPath: "/opt/checkpoint-data"
...
executor:
volumeMounts:
  - name: checkpoint-executor-volume
mountPath: "/opt/checkpoint-data"

After deployment it seems to be working. I tried restarted driver pod and it
did recover from checkpoint directory. But I'm not sure if this is actually
supported by design.

Thanks,
wzhan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
Replied inline:

On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson  wrote:

> Thank you, so that would mean spark gets the current latest offset(s) when
> the trigger fires and then process all available messages in the topic upto
> and including that offset as long as maxOffsetsPerTrigger is the default of
> None (or large enought to handle all available messages).
>

Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will
be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still
affecting even Trigger.Once is used I guess.


>
> I think the word micro-batch confused me (more like mega-batch in some
> cases). It makes sense though, this makes Trigger.Once a fixed interval
> trigger that's only fired once and not repeatedly.
>

"micro" is relative - though Spark by default processes all available
inputs per batch, in most cases you'll want to make the batch size
(interval) as small as possible, as it defines the latency of the output.
Trigger.Once is an unusual case in streaming workload - that's more alike
continuous execution of "batch". I refer "continuous" as picking up latest
context which is the characteristic of streaming query, hence hybrid one.


>
>
> On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
> wrote:
>
>> If I understand correctly, Trigger.once executes only one micro-batch and
>> terminates, that's all. Your understanding of structured streaming applies
>> there as well.
>>
>> It's like a hybrid approach as bringing incremental processing from
>> micro-batch but having processing interval as batch. That said, while it
>> enables to get both sides of benefits, it's basically structured streaming,
>> inheriting all the limitations on the structured streaming, compared to the
>> batch query.
>>
>> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
>> Trigger.once will "ignore" the read limit per micro-batch on data source
>> (like maxOffsetsPerTrigger) and process all available input as possible.
>> (Data sources should migrate to the new API to take effect, but works for
>> built-in data sources like file and Kafka.)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30669
>>
>> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>>
>>> I've always had a question about Trigger.Once that I never got around to
>>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>>
>>> Will Trigger.Once get the last offset(s) when it starts and then quit
>>> once it hits this offset(s) or will the job run until no new messages is
>>> added to the topic for a particular amount of time?
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>>
>>>> Hi Rishi,
>>>>
>>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>>> The way we look at streaming is that it doesn't have to be always real
>>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>>> repeat indefinitely. See this blog post for more details!
>>>>
>>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I recently started playing with spark streaming, and checkpoint
>>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>>> about using spark streaming with checkpoint location option as a slow 
>>>>> batch
>>>>> processing solution. What would be the pros and cons of utilizing 
>>>>> streaming
>>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>>> processing application?
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Magnus Nilsson
Thank you, so that would mean spark gets the current latest offset(s) when
the trigger fires and then process all available messages in the topic upto
and including that offset as long as maxOffsetsPerTrigger is the default of
None (or large enought to handle all available messages).

I think the word micro-batch confused me (more like mega-batch in some
cases). It makes sense though, this makes Trigger.Once a fixed interval
trigger that's only fired once and not repeatedly.


On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
wrote:

> If I understand correctly, Trigger.once executes only one micro-batch and
> terminates, that's all. Your understanding of structured streaming applies
> there as well.
>
> It's like a hybrid approach as bringing incremental processing from
> micro-batch but having processing interval as batch. That said, while it
> enables to get both sides of benefits, it's basically structured streaming,
> inheriting all the limitations on the structured streaming, compared to the
> batch query.
>
> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
> Trigger.once will "ignore" the read limit per micro-batch on data source
> (like maxOffsetsPerTrigger) and process all available input as possible.
> (Data sources should migrate to the new API to take effect, but works for
> built-in data sources like file and Kafka.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30669
>
> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>
>> I've always had a question about Trigger.Once that I never got around to
>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>
>> Will Trigger.Once get the last offset(s) when it starts and then quit
>> once it hits this offset(s) or will the job run until no new messages is
>> added to the topic for a particular amount of time?
>>
>> br,
>>
>> Magnus
>>
>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>
>>> Hi Rishi,
>>>
>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>> The way we look at streaming is that it doesn't have to be always real
>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>> repeat indefinitely. See this blog post for more details!
>>>
>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>
>>> Best,
>>> Burak
>>>
>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I recently started playing with spark streaming, and checkpoint
>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>> about using spark streaming with checkpoint location option as a slow batch
>>>> processing solution. What would be the pros and cons of utilizing streaming
>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>> processing application?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and
terminates, that's all. Your understanding of structured streaming applies
there as well.

It's like a hybrid approach as bringing incremental processing from
micro-batch but having processing interval as batch. That said, while it
enables to get both sides of benefits, it's basically structured streaming,
inheriting all the limitations on the structured streaming, compared to the
batch query.

Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
Trigger.once will "ignore" the read limit per micro-batch on data source
(like maxOffsetsPerTrigger) and process all available input as possible.
(Data sources should migrate to the new API to take effect, but works for
built-in data sources like file and Kafka.)

1. https://issues.apache.org/jira/browse/SPARK-30669

2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:

> I've always had a question about Trigger.Once that I never got around to
> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>
> Will Trigger.Once get the last offset(s) when it starts and then quit once
> it hits this offset(s) or will the job run until no new messages is added
> to the topic for a particular amount of time?
>
> br,
>
> Magnus
>
> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>
>> Hi Rishi,
>>
>> That is exactly why Trigger.Once was created for Structured Streaming.
>> The way we look at streaming is that it doesn't have to be always real
>> time, or 24-7 always on. We see streaming as a workflow that you have to
>> repeat indefinitely. See this blog post for more details!
>>
>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>
>> Best,
>> Burak
>>
>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I recently started playing with spark streaming, and checkpoint location
>>> feature looks very promising. I wonder if anyone has an opinion about using
>>> spark streaming with checkpoint location option as a slow batch processing
>>> solution. What would be the pros and cons of utilizing streaming with
>>> checkpoint location feature to achieve fault tolerance in batch processing
>>> application?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Magnus Nilsson
I've always had a question about Trigger.Once that I never got around to
ask or test for myself. If you have a 24/7 stream to a Kafka topic.

Will Trigger.Once get the last offset(s) when it starts and then quit once
it hits this offset(s) or will the job run until no new messages is added
to the topic for a particular amount of time?

br,

Magnus

On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Thanks Burak! Appreciate it. This makes sense.

How do you suggest we make sure resulting data doesn't produce tiny files?
If we are not on databricks yet and can not leverage delta lake features?
Also checkpointing feature, do you have active blog/article I can take
a look at to try out an example?

On Fri, May 1, 2020 at 7:22 PM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Burak Yavuz
Hi Rishi,

That is exactly why Trigger.Once was created for Structured Streaming. The
way we look at streaming is that it doesn't have to be always real time, or
24-7 always on. We see streaming as a workflow that you have to repeat
indefinitely. See this blog post for more details!
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

Best,
Burak

On Fri, May 1, 2020 at 2:55 PM Rishi Shah  wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint location
> feature looks very promising. I wonder if anyone has an opinion about using
> spark streaming with checkpoint location option as a slow batch processing
> solution. What would be the pros and cons of utilizing streaming with
> checkpoint location feature to achieve fault tolerance in batch processing
> application?
>
> --
> Regards,
>
> Rishi Shah
>


[spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Hi All,

I recently started playing with spark streaming, and checkpoint location
feature looks very promising. I wonder if anyone has an opinion about using
spark streaming with checkpoint location option as a slow batch processing
solution. What would be the pros and cons of utilizing streaming with
checkpoint location feature to achieve fault tolerance in batch processing
application?

-- 
Regards,

Rishi Shah


Re: [Structured Streaming] Checkpoint file compact file grows big

2020-04-19 Thread Jungtaek Lim
Deleting the latest .compact file would lose the ability for exactly-once
and lead Spark fail to read from the output directory. If you're reading
the output directory from non-Spark then metadata on output directory
doesn't matter, but there's no exactly-once (exactly-once is achieved
leveraging the metadata, which only Spark can read).

Btw, what you've encountered is the one of known issues on file stream sink
- there're two different JIRA issues filed for the same issue so far
(reported from end users):

https://issues.apache.org/jira/browse/SPARK-24295
https://issues.apache.org/jira/browse/SPARK-29995

I've proposed the retention of output files in file stream sink but haven't
got some love. (That means it's not guaranteed to be addressed)

https://issues.apache.org/jira/browse/SPARK-27188

Given the patch is stale, I'm planning to rework based on latest master
again sooner.

Btw, I've also proposed other improvements to help addressing latency
issues in file stream source & file stream sink but haven't got some love
from committers as well (no guarantee to be addressed)

https://issues.apache.org/jira/browse/SPARK-30804
https://issues.apache.org/jira/browse/SPARK-30866
https://issues.apache.org/jira/browse/SPARK-30900
https://issues.apache.org/jira/browse/SPARK-30915
https://issues.apache.org/jira/browse/SPARK-30946

SPARK-30946 is closely related to the issue - it will help the size of
checkpoint file much smaller and also much shorter elapsed time to compact.
Efficiency would depend on compression ratio, but it could achieve 5 times
faster to compact and 80% smaller (1/5 of original) which would delay the
point of time greatly even without TTL. Say, if you reached the bad state
in 2 weeks, the patch would make it delayed by 8 weeks more (10 weeks to
reach the bad state).

That said, it doesn't completely get rid of necessity of TTL, but open the
chance to have longer TTL without encountering bad state.

If you're adventurous you can apply these patches on your version of Spark
and see whether it helps.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Thu, Apr 16, 2020 at 9:24 AM Ahn, Daniel 
wrote:

> Are Spark Structured Streaming checkpoint files expected to grow over time
> indefinitely? Is there a recommended way to safely age-off old checkpoint
> data?
>
>
>
> Currently we have a Spark Structured Streaming process reading from Kafka
> and writing to an HDFS sink, with checkpointing enabled and writing to a
> location on HDFS. This streaming application has been running for 4 months
> and over time we have noticed that with every 10th job within the
> application there is about a 5 minute delay between when a job finishes and
> the next job starts which we have attributed to the checkpoint compaction
> process. At this point the .compact file that is written is about 2GB in
> size and the contents of the file show it keeps track of files it processed
> at the very origin of the streaming application.
>
>
>
> This issue can be reproduced with any Spark Structured Streaming process
> that writes checkpoint files.
>
>
>
> Is the best approach for handling the growth of these files to simply
> delete the latest .compact file within the checkpoint directory, and are
> there any associated risks with doing so?
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re:[Structured Streaming] Checkpoint file compact file grows big

2020-04-15 Thread Kelvin Qin



SEE:http://spark.apache.org/docs/2.3.1/streaming-programming-guide.html#checkpointing
"Note that checkpointing of RDDs incurs the cost of saving to reliable storage. 
This may cause an increase in the processing time of those batches where RDDs 
get checkpointed."


As far as I know, the official documentation states that the checkpoint of the 
spark streaming application will continue to increase over time.
Whereas data or RDD checkpointing is necessary even for basic functioning if 
stateful transformations are used.
So,for applications that require long-term aggregation, I choose to use 
third-party caches in production, such as redis. Maybe you can try Alluxio




Wishes!







在 2020-04-16 08:19:24,"Ahn, Daniel"  写道:

Are Spark Structured Streaming checkpoint files expected to grow over time 
indefinitely? Is there a recommended way to safely age-off old checkpoint data?

 

Currently we have a Spark Structured Streaming process reading from Kafka and 
writing to an HDFS sink, with checkpointing enabled and writing to a location 
on HDFS. This streaming application has been running for 4 months and over time 
we have noticed that with every 10th job within the application there is about 
a 5 minute delay between when a job finishes and the next job starts which we 
have attributed to the checkpoint compaction process. At this point the 
.compact file that is written is about 2GB in size and the contents of the file 
show it keeps track of files it processed at the very origin of the streaming 
application.

 

This issue can be reproduced with any Spark Structured Streaming process that 
writes checkpoint files.

 

Is the best approach for handling the growth of these files to simply delete 
the latest .compact file within the checkpoint directory, and are there any 
associated risks with doing so?

 


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.

[Structured Streaming] Checkpoint file compact file grows big

2020-04-15 Thread Ahn, Daniel
Are Spark Structured Streaming checkpoint files expected to grow over time 
indefinitely? Is there a recommended way to safely age-off old checkpoint data?

Currently we have a Spark Structured Streaming process reading from Kafka and 
writing to an HDFS sink, with checkpointing enabled and writing to a location 
on HDFS. This streaming application has been running for 4 months and over time 
we have noticed that with every 10th job within the application there is about 
a 5 minute delay between when a job finishes and the next job starts which we 
have attributed to the checkpoint compaction process. At this point the 
.compact file that is written is about 2GB in size and the contents of the file 
show it keeps track of files it processed at the very origin of the streaming 
application.

This issue can be reproduced with any Spark Structured Streaming process that 
writes checkpoint files.

Is the best approach for handling the growth of these files to simply delete 
the latest .compact file within the checkpoint directory, and are there any 
associated risks with doing so?


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


[Spark MicroBatchExecution] Error fetching kafka/checkpoint/state/0/0/1.delta does not exist

2020-03-12 Thread Miguel Silvestre
Hi community,

I'm having this error in some kafka streams:

Caused by: java.io.FileNotFoundException: File
file:/efs/.../kafka/checkpoint/state/0/0/1.delta does not exist

Because of this I have some streams down. How can I fix this?

Thank you.

--
Miguel Silvestre


Spark checkpoint problem for python api

2019-07-29 Thread zenglong chen
Hi,
My code is below:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def test(record_list):
print(list(record_list))
return record_list

def functionToCreateContext():
conf = 
SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077")
\
.set("spark.executor.memory", '6g') \
.set("spark.executor.cores", '8') \
.set("spark.deploy.defaultCores", '8') \
.set("spark.cores.max", '16') \
.set("spark.streaming.kafka.maxRatePerPartition", 1) \
.set("spark.streaming.blockInterval", 1) \
.set("spark.default.parallelism", 8) \
.set("spark.driver.host", '172.22.9.181') \


sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/spark/checkpoints/model_event_spark")
return ssc

if __name__ == '__main__':
ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark",
functionToCreateContext)
record_dstream =
KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"],

kafkaParams={"bootstrap.servers":"xxx:9092",

"auto.offset.reset":"smallest",
},
   )

record_dstream.checkpoint(5).mapPartitions(test).pprint()
ssc.start()
ssc.awaitTermination()


When the scripts starts at the first time,it work well.

But second time started from checkpointDirectory,it has problem like:

2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error
starting the context, marking it as stopped
org.apache.spark.SparkException:
org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed
has not been initialized
at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at ... run in separate thread using org.apache.spark.util.ThreadUtils 
... ()
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invo

Checkpointing and accessing the checkpoint data

2019-06-27 Thread Jean-Georges Perrin
Hi Sparkians,

Few questions around checkpointing.

1. Checkpointing “dump” file / persisting to disk
Is the file encrypted or is it a standard parquet file? 

2. If the file is not encrypted, can I use it with another app (I know it’s 
kind of of a weird stretch case)

3. Have you/do you know of any performance comparison between the two? On small 
datasets, caching seems more performant, but I can imagine that there is a 
sweet spot…

Thanks!

jgp



Jean -Georges Perrin
j...@jgp.net






Spark Streaming: Checkpoint, Recovery and Idempotency

2019-05-29 Thread sheelstera
Hello,

I am trying to understand the content of a checkpoint and corresponding
recovery.

*My understanding of Spark Checkpointing:
*
If you have really long DAGs and your spark cluster fails, checkpointing
helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50
transformations can be reduced to 4-5 transformations with the help of
checkpointing. It breaks the DAG though.

*Checkpointing in Streaming
*
My Spark Streaming job has a microbatch of 5 seconds. As I understand, a new
job is submitted every 5 secs on the Eventloop that invokes the JobGenerator
to generate the RDD DAG for the new microbatch from the DStreamGraph, while
the receiver in the meantime keeps collecting the data for the next new
microbatch for the next cycle. If I enable checkpointing, as I understand,
it will periodically keep checkpointing the "current state".

*Question:
*
What is this "state"? Is this the combination of the base RDD and the state
of the operators/transformations of the DAG for the present microbatch only?
So I have the following:

/ubatch 0 at T=0 > SUCCESS
ubatch 1 at T=5 > SUCCESS
ubatch 2 at T=10 ---> SUCCESS
> Checkpointing kicks in now at T=12
ubatch 3 at T=15 ---> SUCCESS
ubatch 4 at T=20
> Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!!
...
> Spark Cluster is restarted at *T=100*/

What specifically goes and sits on the disk as a result of checkpointing at
T=12? Will it just store the present state of operators of the DAG for
ubatch 2?

a. If yes, then during recovery at T=100, the last checkpoint available is
at T=12. What happens to the ubatch 3 at T=15 which was already processed
successfully. Does the application reprocess ubatch 3 and handle idempotency
here? If yes, do we go to the streaming source e.g. Kafka and rewind the
offset to be able to replay the contents starting from the ubatch 3?

b. If no, then what exactly goes into the checkpoint directory at T=12?

https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency
  

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark Streaming “PicklingError: Could not serialize object” when use transform operator and checkpoint enabled

2019-05-23 Thread Xilang Yan
In PySpark streaming, if checkpoint enabled, and if use a stream.transform
operator to join with another rdd, “PicklingError: Could not serialize
object” will be thrown. I have asked the same question at stackoverflow:
https://stackoverflow.com/questions/56267591/pyspark-streaming-picklingerror-could-not-serialize-object-when-checkpoint-an

After some investigation, I found the problem is due to checkpoint will
serialize lambda and then serialize the rdd in lambda. So I change the code
to something like below, the purpose is to use a static transient variable 
to avoid serialize rdd.

class DocInfoHolder:
doc_info = None

line.transform(lambda rdd:rdd.join(DocInfoHolder.doc_info)).pprint(10)

But problem exist still. Then I found pyspark use a special pickle called
cloudpickle.py, looks like it will serialize any reference class, function,
lambda code, and there is no document about how to skip serialize. Could
anyone help, how to walk around this issue.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark checkpoint between 2 jobs and HDFS ramfs with storage policy

2019-05-21 Thread Julien Laurenceau
Hi,

I am looking for a setup that would be to be able to split a single spark
processing into 2 jobs (operational constraints) without wasting too much
time persisting the data between the two jobs during spark
checkpoint/writes.

I have a config with a lot of ram and I'm willing to configure a a few
hundreds GB in ramfs, but I cannot find any feedbacks on these kind of
configurations... and the doc hadoop that tells me "network replication
negates the benefits of writing to memory" doesn't inspire me much
confidence regarding performance improvement.
My HDFS is configured with replication 3, so if the LAZY_PERSIST writes
still imply waiting for the three replica to be written in the ramfs of
three different datanodes, I can understand that the performance
improvement will be really small.
In addition if my second job is not provisioned on the same datanodes, the
ramfs might not be of any help.

Any advice regarding the use of HDFS ramfs and Lazy persist with spark
checkpoint ? Deadend ? larger linux pagecache would be more helpful ?

Regards,
JL


Redeploying spark streaming application aborts because of checkpoint issue

2018-10-14 Thread Kuttaiah Robin
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.

Read events from Kafka as shown below;

m_oKafkaEvents = getSparkSession().readStream().format("kafka")
  .option("kafka.bootstrap.servers", strKafkaAddress)
  .option("assign", strSubscription)
  .option("maxOffsetsPerTrigger", "10")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", false)
  .load()
  .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
  .select("events.*");

Checkpoint is used as shown below;

DataStreamWriter oMilestoneStream = oAggregation
  .writeStream()
  .queryName(strQueryName)
  .outputMode("update")

.trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval()))
  .option("checkpointLocation", strCheckpointLocation)
  .foreach(oForeachWriter);

strCheckpointLocation is something
like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj.
This is hdfs location.


With this when I redeploy the spark I get below said exception. The only
work around I have currently is to delete the checkpoint location and
recreate the topic.

I also see couple of JIRA tasks which says RESOLVED but the problem still
seen.
https://issues.apache.org/jira/browse/SPARK-20894
https://issues.apache.org/jira/browse/SPARK-22262

Can someone help me on what is the best solution for this?

thanks,
Robin Kuttaiah



Exception
---
18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalStateException: Error reading delta file
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
of HDFSStateStoreProvider[id = (op=1,part=0),dir =
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]:
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
does not exist
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org
$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
at
scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196)
at
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369)
at
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
Caused by: java.io.FileNotFoundException: File does not exist:
/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
at
org.apache.hadoop.hdfs.server.nam

Re: Structured Streaming doesn't write checkpoint log when I use coalesce

2018-08-09 Thread Jungtaek Lim
Which version do you use? Above app works with Spark 2.3.1, 200 partitions
are stored for State.

val queryStatusFile = conf.queryStatusFile()
val rateRowPerSecond = conf.rateRowPerSecond()
val rateRampUpTimeSecond = conf.rateRampUpTimeSecond()

val ss = SparkSession
  .builder()
  .master("local[3]")
  .appName("state coalesce test")
  .getOrCreate()

ss.streams.addListener(new
QueryListenerWriteProgressToFile(queryStatusFile))

import ss.implicits._

val df = ss.readStream
  .format("rate")
  .option("rowsPerSecond", rateRowPerSecond)
  .option("rampUpTime", s"${rateRampUpTimeSecond}s")
  .load()

df.printSchema()

val outDf = df.withWatermark("timestamp", "10 seconds")
  .selectExpr(
"timestamp", "mod(value, 100) as mod", "value",
BenchmarkQueryHelper.createCaseExprStr(
  "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word")
  .groupBy(
window($"timestamp", "1 minute", "10 seconds"),
$"mod", $"word")
  .agg(max("value").as("max_value"), min("value").as("min_value"),
avg("value").as("avg_value"))
  .coalesce(8)

val query = outDf.writeStream
  .format("memory")
  .option("queryName", "stateCoalesceTest")
  .option("checkpointLocation", "/tmp/state-coalesce-test")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode(OutputMode.Update())
  .start()

query.awaitTermination()

-Jungtaek Lim (HeartSaVioR)


2018년 8월 9일 (목) 오후 8:38, WangXiaolong 님이 작성:

> Hi,
>
>Lately, I encountered a problem, when I was writing as structured
> streaming job to write things into opentsdb.
>   The write-stream part looks something like
>
>   outputDs
>   .coalesce(14)
>   .writeStream
>   .outputMode("append")
>   .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
>   .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
>   .foreach {
> TsdbWriter(
>   tsdbUrl,
>   MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword,
> mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
> )(createMetricBuilder(tsdbMetricPrefix))
>   }
>   .start()
>
> And when I check the checkpoint dir, I discover that the
> "/checkpoint/state" dir  is empty. I looked into the executor's log and
> found that the HDFSBackedStateStoreProvider didn't write anything on the
> checkpoint dir.
>
>Strange thing is, when I replace the "coalesce" function into
> "repartition" function, the problem solved. Is there a difference between
> these two functions when using structured streaming?
>
>   Looking forward to you help, thanks.
>
>
>
>
>


Structured Streaming doesn't write checkpoint log when I use coalesce

2018-08-09 Thread WangXiaolong
Hi,


   Lately, I encountered a problem, when I was writing as structured streaming 
job to write things into opentsdb.
  The write-stream part looks something like 


  outputDs
  .coalesce(14)
  .writeStream
  .outputMode("append")
  .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
  .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
  .foreach {
TsdbWriter(
  tsdbUrl,
  MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, 
mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
)(createMetricBuilder(tsdbMetricPrefix))
  }
  .start()


And when I check the checkpoint dir, I discover that the 
"/checkpoint/state" dir  is empty. I looked into the executor's log and found 
that the HDFSBackedStateStoreProvider didn't write anything on the checkpoint 
dir.


   Strange thing is, when I replace the "coalesce" function into "repartition" 
function, the problem solved. Is there a difference between these two functions 
when using structured streaming?


  Looking forward to you help, thanks.



Re: [Structured Streaming] Reading Checkpoint data

2018-07-09 Thread subramgr
thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] Reading Checkpoint data

2018-07-09 Thread Tathagata Das
Only the stream metadata (e.g., streamid, offsets) are stored as json. The
stream state data is stored in an internal binary format.

On Mon, Jul 9, 2018 at 4:07 PM, subramgr 
wrote:

> Hi,
>
> I read somewhere that with Structured Streaming all the checkpoint data is
> more readable (Json) like. Is there any documentation on how to read the
> checkpoint data.
>
> If I do `hadoop fs -ls` on the `state` directory I get some encoded data.
>
> Thanks
> Girish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Structured Streaming] Reading Checkpoint data

2018-07-09 Thread subramgr
Hi, 

I read somewhere that with Structured Streaming all the checkpoint data is
more readable (Json) like. Is there any documentation on how to read the
checkpoint data. 

If I do `hadoop fs -ls` on the `state` directory I get some encoded data.

Thanks
Girish



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



making query state checkpoint compatible in structured streaming

2018-06-17 Thread puneetloya
Consider there is a spark query(A) which is dependent on Kafka topics t1 and
t2.

After running this query in the streaming mode, a checkpoint(C1) directory
for the query gets created with offsets and sources directories. Now I add a
third topic(t3) on which the query is dependent.

Now if I restart spark with the same checkpoint C1, Spark crashes as
expected, as it could not find the entry for the third topic(t3).

So just as part of a hack, I tried to add the topic t3 to the checkpoint
manually to the sources and offset directories of the query in the
checkpoint. But spark still crashed.

Whats the correct way to solve this problem? How to handle such upgrade
paths in structured streaming?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-06-06 Thread amihay gonen
If you are using kafka direct connect api it might be committing offset
back to kafka itself

בתאריך יום ה׳, 7 ביוני 2018, 4:10, מאת licl ‏:

> I met the same issue and I have try to delete the checkpoint dir before the
> job ,
>
> But spark seems can read the correct offset  even though after the
> checkpoint dir is deleted ,
>
> I don't know how spark do this without checkpoint's metadata.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-06-06 Thread licl
I met the same issue and I have try to delete the checkpoint dir before the
job ,

But spark seems can read the correct offset  even though after the
checkpoint dir is deleted ,

I don't know how spark do this without checkpoint's metadata.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using checkpoint much, much faster than cache. Why?

2018-06-05 Thread Phillip Henry
Hi, folks.

I am using Spark 2.2.0 and a combination of Spark ML's LinearSVC and
OneVsRest to classify some documents that are originally read from HDFS
using sc.wholeTextFiles.

When I use it on small documents, I get the results in a few minutes. When
I use it on the same number of large documents, it takes hours.

NOTE! I munge every documents to a fixed length vector which is the same
size irrespective of the size of the document.

Using jstat, I see all my executor threads in serialization code even
though all the data easily fits into the memory of the cluster ("Fraction
cached: 100%" everywhere).

I have called cache() on all my DataFrames with no effect. However, calling
checkpoint() on the DF fed to Spark's ML code solved the problem.

So, although the problem is fixed, I'd like to know why cache() did not
work when checkpoint() did.

Can anybody explain?

Thanks,

Phill


Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-03-22 Thread M Singh
Hi:
I am working on a realtime application using spark structured streaming (v 
2.2.1). The application reads data from kafka and if there is a failure, I 
would like to ignore the checkpoint.  Is there any configuration to just read 
from last kafka offset after a failure and ignore any offset checkpoints ? 
Also, I believe that the checkpoint also saves state and will continue to 
aggregations after recovery.  Is there any way to ignore checkpointed state ?
Also, is there a way to selectively save state or offset checkpoint only ?

Thanks


The last successful batch before stop re-execute after restart the DStreams with checkpoint

2018-03-11 Thread Terry Hoo
Experts,

I see the last batch before stop (graceful shutdown) always re-execute
after restart the DStream from a checkpoint, is this a expected behavior?

I see a bug in JIRA: https://issues.apache.org/jira/browse/SPARK-20050,
whic reports duplicates on Kafka, I also see this with HDFS file.

Regards
- Terry


Issue with EFS checkpoint

2018-02-07 Thread Khan, Obaidur Rehman
Hello,

We have a Spark cluster with 3 worker nodes available as EC2 on AWS. Spark 
application is running in cluster mode and the checkpoints are stored in EFS. 
Spark version used is 2.2.0.

We noticed the below error coming up – our understanding was that this 
intermittent checkpoint issue will be resolved with EFS once we moved away from 
S3.

Caused by: java.io.FileNotFoundException: File 
file:/efs/checkpoint/UPDATE_XXX/offsets/.3ff13bc6-3eeb-4b87-be87-5d1106efcd62.tmp
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

Please help me understand the issue and let me know if there is any fix 
available for this.

Regards,
Rehman


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark Streaming checkpoint

2018-01-29 Thread KhajaAsmath Mohammed
Hi,

I have written spark streaming job to use the checkpoint. I have stopped
the streaming job for 5 days and then restart it today.

I have encountered weird issue where it shows as zero records for all
cycles till date. is it causing data loss?

[image: Inline image 1]


Thanks,
Asmath


Null pointer exception in checkpoint directory

2018-01-16 Thread KhajaAsmath Mohammed
Hi,

I keep getting null pointer exception in the spark streaming job with
checkpointing. any suggestions to resolve this.

Exception in thread "pool-22-thread-9" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-22-thread-10" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-22-thread-11" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-22-thread-12" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Thanks,
Asmath


Re: Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread Shixiong(Ryan) Zhu
The root cause is probably that HDFSMetadataLog ignores exceptions thrown
by "output.close". I think this should be fixed by this line in Spark 2.2.1
and 3.0.0:
https://github.com/apache/spark/commit/6edfff055caea81dc3a98a6b4081313a0c0b0729#diff-aaeb546880508bb771df502318c40a99L126

Could you try 2.2.1?

On Thu, Jan 4, 2018 at 9:08 AM, William Briggs  wrote:

> I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
> job sources data from a Kafka topic, performs a variety of filters and
> transformations, and sinks data back into a different Kafka topic.
>
> Once per day, we stop the query in order to merge the namenode edit logs
> with the fsimage, because Structured Streaming creates and destroys a
> significant number of HDFS files, and EMR doesn't support a secondary or HA
> namenode for fsimage compaction (AWS support directed us to do this, as
> Namenode edit logs were filling the disk).
>
> Occasionally, the Structured Streaming query will not restart because the
> most recent file in the "commits" or "offsets" checkpoint subdirectory is
> empty. This seems like an undesirable behavior, as it requires manual
> intervention to remove the empty files in order to force the job to fall
> back onto the last good values. Has anyone run into this behavior? The only
> similar issue I can find is SPARK-21760
> <https://issues.apache.org/jira/browse/SPARK-21760>, which appears to
> have no fix or workaround.
>
> Any assistance would be greatly appreciated!
>
> Regards,
> Will
>


Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread William Briggs
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
job sources data from a Kafka topic, performs a variety of filters and
transformations, and sinks data back into a different Kafka topic.

Once per day, we stop the query in order to merge the namenode edit logs
with the fsimage, because Structured Streaming creates and destroys a
significant number of HDFS files, and EMR doesn't support a secondary or HA
namenode for fsimage compaction (AWS support directed us to do this, as
Namenode edit logs were filling the disk).

Occasionally, the Structured Streaming query will not restart because the
most recent file in the "commits" or "offsets" checkpoint subdirectory is
empty. This seems like an undesirable behavior, as it requires manual
intervention to remove the empty files in order to force the job to fall
back onto the last good values. Has anyone run into this behavior? The only
similar issue I can find is SPARK-21760
<https://issues.apache.org/jira/browse/SPARK-21760>, which appears to have
no fix or workaround.

Any assistance would be greatly appreciated!

Regards,
Will


Spark 2.1.2 Spark Streaming checkpoint interval not respected

2017-11-18 Thread Shing Hing Man
Hi, 
In the following example using mapWithState, I set checkpoint interval to 1 
minute. From the log, Spark stills write to the checkpoint directory every 
second. Would be appreciated if someone can point out what I have done wrong. 
object MapWithStateDemo {
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: MapWithStateDemo  ")
  System.exit(1)
}


val sparkConf = new SparkConf().setAppName("MapWithStateDemo")
  .setIfMissing("spark.master","local[*]")

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))


// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of 
the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)
  output
}

val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] =
  
wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD))


stateDstream.checkpoint(Minutes(1L))
stateDstream.print()



val targetDir = new 
File(getClass.getResource("/").toURI).getParentFile.getParentFile
val checkpointDir = targetDir + "/checkpoint"
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
  }
}
Thanks in advance for any assistance !
Shing


Re: Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy

2017-10-13 Thread Jörn Franke
HDFS can be r placed by other filesystem plugins (eg ignitefs, s3, etc) so the 
easiest is to write a file system plugin. This is not a plug-in for Spark but 
part of the Hadoop functionality used by Spark.

> On 13. Oct 2017, at 17:41, Anand Chandrashekar  wrote:
> 
> Greetings!
> 
> I would like to accomplish a custom kafka checkpoint strategy (instead of 
> hdfs, i would like to use redis). is there a strategy I can use to change 
> this behavior; any advise will help. Thanks!
> 
> Regards,
> Anand.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy

2017-10-13 Thread Anand Chandrashekar
Greetings!

I would like to accomplish a custom kafka checkpoint strategy (instead of
hdfs, i would like to use redis). is there a strategy I can use to change
this behavior; any advise will help. Thanks!

Regards,
Anand.


Re: Cases when to clear the checkpoint directories.

2017-10-09 Thread Tathagata Das
Any changes in the Java code (to be specific, the generated bytecode) in
the functions you pass to Spark (i.e., map function, reduce function, as
well as it closure dependencies) counts as "application code change", and
will break the recovery from checkpoints.

On Sat, Oct 7, 2017 at 11:53 AM, John, Vishal (Agoda)  wrote:

>
>
> Hello TD,
>
> You had replied to one of the questions about checkpointing –
>
> This is an unfortunate design on my part when I was building DStreams :)
>
> Fortunately, we learnt from our mistakes and built Structured Streaming
> the correct way. Checkpointing in Structured Streaming stores only the
> progress information (offsets, etc.), and the user can change their
> application code (within certain constraints, of course) and still restart
> from checkpoints (unlike DStreams). If you are just building out your
> streaming applications, then I highly recommend you to try out Structured
> Streaming instead of DStreams (which is effectively in maintenance mode).
>
> Can you please elaborate on what you mean by application code change in
> DStream applications?
>
> If I add a couple of println statements in my application code will that
> become an application code change? or do you mean, changing method
> signatures or adding new methods etc.
> Could you please point to relevant source code in Spark, which does this
> type of code validation/de-serialisation in case of DStreams?
>
> We are using mapWithState in our application and it builds its state from
> checkpointed RDDs.  I would like understand the cases where we can avoid
> clearing the checkpoint directories.
>
>
> thanks in advance,
> Vishal
>
>
> 
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>


Cases when to clear the checkpoint directories.

2017-10-07 Thread John, Vishal (Agoda)


Hello TD,

You had replied to one of the questions about checkpointing –

This is an unfortunate design on my part when I was building DStreams :)

Fortunately, we learnt from our mistakes and built Structured Streaming the 
correct way. Checkpointing in Structured Streaming stores only the progress 
information (offsets, etc.), and the user can change their application code 
(within certain constraints, of course) and still restart from checkpoints 
(unlike DStreams). If you are just building out your streaming applications, 
then I highly recommend you to try out Structured Streaming instead of DStreams 
(which is effectively in maintenance mode).

Can you please elaborate on what you mean by application code change in DStream 
applications?

If I add a couple of println statements in my application code will that become 
an application code change? or do you mean, changing method signatures or 
adding new methods etc.
Could you please point to relevant source code in Spark, which does this type 
of code validation/de-serialisation in case of DStreams?

We are using mapWithState in our application and it builds its state from 
checkpointed RDDs.  I would like understand the cases where we can avoid 
clearing the checkpoint directories.


thanks in advance,
Vishal



This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


ReduceByKeyAndWindow checkpoint recovery issues in Spark Streaming

2017-08-23 Thread SRK
Hi,

ReduceByKeyAndWindow checkpoint recovery has issues when trying to recover
for the second time. Basically it is losing the reduced value of the
previous window but is present in the old values that needs to be inverse
reduced resulting in the following error. Does anyone has any idea as to why
it does not recover properly the second time?


Neither previous window has value for key, nor new values found. Are you
sure your key class hashes consistently?
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:143)
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:130)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKeyAndWindow-checkpoint-recovery-issues-in-Spark-Streaming-tp29100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Issues when trying to recover a textFileStream from checkpoint in Spark streaming

2017-08-11 Thread swetha kasireddy
Hi,

I am facing issues while trying to recover a textFileStream from checkpoint.
Basically it is trying to load the files from the begining  of the job start
whereas I am deleting the files after processing them. I have the following
configs set so was thinking that it should not look for files beyond 2
minutes when trying to recover from checkpoint. Any suggestions on this
would be of great help.

  sparkConf.set("spark.streaming.minRememberDuration","120s")
  sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s")

Thanks,
Swetha




-- Forwarded message --
From: SRK 
Date: Thu, Aug 10, 2017 at 5:04 PM
Subject: Issues when trying to recover a textFileStream from checkpoint in
Spark streaming
To: user@spark.apache.org


Hi,

I am facing issues while trying to recover a textFileStream from checkpoint.
Basically it is trying to load the files from the begining  of the job start
whereas I am deleting the files after processing them. I have the following
configs set so was thinking that it should not look for files beyond 2
minutes when trying to recover from checkpoint. Any suggestions on this
would be of great help.

  sparkConf.set("spark.streaming.minRememberDuration","120s")
  sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s")

Thanks,
Swetha



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/Issues-when-trying-to-recover-a-textFileStream-from-
checkpoint-in-Spark-streaming-tp29052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Issues when trying to recover a textFileStream from checkpoint in Spark streaming

2017-08-10 Thread SRK
Hi,

I am facing issues while trying to recover a textFileStream from checkpoint.
Basically it is trying to load the files from the begining  of the job start
whereas I am deleting the files after processing them. I have the following
configs set so was thinking that it should not look for files beyond 2
minutes when trying to recover from checkpoint. Any suggestions on this
would be of great help.

  sparkConf.set("spark.streaming.minRememberDuration","120s")
  sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s")

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-trying-to-recover-a-textFileStream-from-checkpoint-in-Spark-streaming-tp29052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2017-08-08 Thread dcam
Considering the @transient annotations and the work done in the instance
initializer, not much state is really be broadcast to the executors. It
might be simpler to just create these instances on the executors, rather
than trying to broadcast them?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698p29044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-08 Thread Jacek Laskowski
Hi Michael,

That reflects my sentiments so well. Thanks for having confirmed my thoughts!

https://issues.apache.org/jira/browse/SPARK-21667

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Aug 8, 2017 at 12:37 AM, Michael Armbrust
 wrote:
> I think there is really no good reason for this limitation.
>
> On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> While exploring checkpointing with kafka source and console sink I've
>> got the exception:
>>
>> // today's build from the master
>> scala> spark.version
>> res8: String = 2.3.0-SNAPSHOT
>>
>> scala> val q = records.
>>  |   writeStream.
>>  |   format("console").
>>  |   option("truncate", false).
>>  |   option("checkpointLocation", "/tmp/checkpoint"). // <--
>> checkpoint directory
>>  |   trigger(Trigger.ProcessingTime(10.seconds)).
>>  |   outputMode(OutputMode.Update).
>>  |   start
>> org.apache.spark.sql.AnalysisException: This query does not support
>> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
>> start over.;
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>   at
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
>>   ... 61 elided
>>
>> The "trigger" is the change
>> https://issues.apache.org/jira/browse/SPARK-16116 and this line in
>> particular
>> https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277.
>>
>> Why is this needed? I can't think of a use case where console sink
>> could not recover from checkpoint location (since all the information
>> is available). I'm lost on it and would appreciate some help (to
>> recover :))
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-07 Thread Michael Armbrust
I think there is really no good reason for this limitation.

On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski  wrote:

> Hi,
>
> While exploring checkpointing with kafka source and console sink I've
> got the exception:
>
> // today's build from the master
> scala> spark.version
> res8: String = 2.3.0-SNAPSHOT
>
> scala> val q = records.
>  |   writeStream.
>  |   format("console").
>  |   option("truncate", false).
>  |   option("checkpointLocation", "/tmp/checkpoint"). // <--
> checkpoint directory
>  |   trigger(Trigger.ProcessingTime(10.seconds)).
>  |   outputMode(OutputMode.Update).
>  |   start
> org.apache.spark.sql.AnalysisException: This query does not support
> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
> start over.;
>   at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(
> StreamingQueryManager.scala:222)
>   at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(
> StreamingQueryManager.scala:278)
>   at org.apache.spark.sql.streaming.DataStreamWriter.
> start(DataStreamWriter.scala:284)
>   ... 61 elided
>
> The "trigger" is the change
> https://issues.apache.org/jira/browse/SPARK-16116 and this line in
> particular https://github.com/apache/spark/pull/13817/files#diff-
> d35e8fce09686073f81de598ed657de7R277.
>
> Why is this needed? I can't think of a use case where console sink
> could not recover from checkpoint location (since all the information
> is available). I'm lost on it and would appreciate some help (to
> recover :))
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-07 Thread Jacek Laskowski
Hi,

While exploring checkpointing with kafka source and console sink I've
got the exception:

// today's build from the master
scala> spark.version
res8: String = 2.3.0-SNAPSHOT

scala> val q = records.
 |   writeStream.
 |   format("console").
 |   option("truncate", false).
 |   option("checkpointLocation", "/tmp/checkpoint"). // <--
checkpoint directory
 |   trigger(Trigger.ProcessingTime(10.seconds)).
 |   outputMode(OutputMode.Update).
 |   start
org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
start over.;
  at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
  at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
  at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
  ... 61 elided

The "trigger" is the change
https://issues.apache.org/jira/browse/SPARK-16116 and this line in
particular 
https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277.

Why is this needed? I can't think of a use case where console sink
could not recover from checkpoint location (since all the information
is available). I'm lost on it and would appreciate some help (to
recover :))

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: underlying checkpoint

2017-07-16 Thread Mendelson, Assaf
Actually, show is an action.
The issue is that unless you have some aggregations, show will only go over 
some of the dataframe, not all of it and therefore the caching won’t occur 
(similar to what happens with cache).
You need an action which requires to go over the entire dataframe (which count 
does).

Thanks,
  Assaf.

From: Bernard Jesop [mailto:bernard.je...@gmail.com]
Sent: Thursday, July 13, 2017 6:58 PM
To: Vadim Semenov
Cc: user
Subject: Re: underlying checkpoint

Thank you, one of my mistakes was to think that show() was an action.

2017-07-13 17:52 GMT+02:00 Vadim Semenov 
mailto:vadim.seme...@datadoghq.com>>:
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), 
("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
mailto:bernard.je...@gmail.com>> wrote:
Hi everyone, I just tried this simple program :

 import org.apache.spark.sql.SparkSession

 object CheckpointTest extends App {

   val spark = SparkSession
 .builder()
 .appName("Toto")
 .getOrCreate()

   spark.sparkContext.setCheckpointDir(".")

   val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 
15), ("Java", 20)))

   df.show()
   df.rdd.checkpoint()
   println(if (df.rdd.isCheckpointed) "checkpointed" else "not checkpointed")
 }

But the result is still "not checkpointed".
Do you have any idea why? (knowing that the checkpoint file is created)
Best regards,
Bernard JESOP




Re: underlying checkpoint

2017-07-13 Thread Bernard Jesop
Thank you, one of my mistakes was to think that show() was an action.

2017-07-13 17:52 GMT+02:00 Vadim Semenov :

> You need to trigger an action on that rdd to checkpoint it.
>
> ```
> scala>spark.sparkContext.setCheckpointDir(".")
>
> scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python",
> 30), ("R", 15), ("Java", 20)))
> df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
>
> scala> df.rdd.checkpoint()
>
> scala> df.rdd.isCheckpointed
> res2: Boolean = false
>
> scala> df.show()
> +--+---+
> |_1| _2|
> +--+---+
> | Scala| 35|
> |Python| 30|
> | R| 15|
> |  Java| 20|
> +--+---+
>
>
> scala> df.rdd.isCheckpointed
> res4: Boolean = false
>
> scala> df.rdd.count()
> res5: Long = 4
>
> scala> df.rdd.isCheckpointed
> res6: Boolean = true
> ```
>
> On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
> wrote:
>
>> Hi everyone, I just tried this simple program :
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> * import
>> org.apache.spark.sql.SparkSession
>>  object CheckpointTest extends App
>> {
>>val spark =
>> SparkSession
>>
>> .builder()
>>
>> .appName("Toto")
>>
>> .getOrCreate()
>>
>> spark.sparkContext.setCheckpointDir(".")
>>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
>> 15), ("Java",
>> 20)))
>>
>> df.show()
>>
>> df.rdd.checkpoint()
>>println(if (df.rdd.isCheckpointed) "checkpointed" else "not
>> checkpointed")
>>  }*
>> But the result is still *"not checkpointed"*.
>> Do you have any idea why? (knowing that the checkpoint file is created)
>>
>> Best regards,
>> Bernard JESOP
>>
>
>


Re: underlying checkpoint

2017-07-13 Thread Vadim Semenov
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python",
30), ("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
wrote:

> Hi everyone, I just tried this simple program :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> * import
> org.apache.spark.sql.SparkSession
>  object CheckpointTest extends App
> {
>val spark =
> SparkSession
>
> .builder()
>
> .appName("Toto")
>
> .getOrCreate()
>
> spark.sparkContext.setCheckpointDir(".")
>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
> 15), ("Java",
> 20)))
>
> df.show()
>
> df.rdd.checkpoint()
>println(if (df.rdd.isCheckpointed) "checkpointed" else "not
> checkpointed")
>  }*
> But the result is still *"not checkpointed"*.
> Do you have any idea why? (knowing that the checkpoint file is created)
>
> Best regards,
> Bernard JESOP
>


underlying checkpoint

2017-07-13 Thread Bernard Jesop
Hi everyone, I just tried this simple program :


















* import
org.apache.spark.sql.SparkSession
 object CheckpointTest extends App
{
   val spark =
SparkSession

.builder()

.appName("Toto")

.getOrCreate()

spark.sparkContext.setCheckpointDir(".")
   val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
15), ("Java",
20)))

df.show()

df.rdd.checkpoint()
   println(if (df.rdd.isCheckpointed) "checkpointed" else "not
checkpointed")
 }*
But the result is still *"not checkpointed"*.
Do you have any idea why? (knowing that the checkpoint file is created)

Best regards,
Bernard JESOP


Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming

2017-07-03 Thread Yuval.Itzchakov
Using a long period betweem checkpoints may cause a long linage of the graphs
computations to be created, since Spark uses checkpointing to cut it, which
can also cause a delay in the streaming job.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming

2017-07-02 Thread Yuval.Itzchakov
You can't. Spark doesn't let you fiddle with the data being checkpoint, as
it's an internal implementation detail.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming

2017-06-27 Thread SRK
Hi,

I have checkpoints enabled in Spark streaming and I use updateStateByKey and
reduceByKeyAndWindow with inverse functions. How do I reduce the amount of
data that I am writing to the checkpoint or clear out the data that I dont
care?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
In either case, end to end exactly once guarantee can only be ensured only
if the output sink is updated transactionally. The engine has to re execute
data on failure. Exactly once guarantee means that the external storage is
updated as if each data record was computed exactly once. That's why you
need to update them transactionally to handle possible recomputations.

This is true for both spark streaming and structured streaming. Hope this
helps.

On Jun 6, 2017 5:56 AM, "ALunar Beach"  wrote:

> Thanks TD.
> In pre-structured streaming, exactly once guarantee on input is not
> guaranteed. is it?
>
> On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das  > wrote:
>
>> This is the expected behavior. There are some confusing corner cases.
>> If you are starting to play with Spark Streaming, i highly recommend
>> learning Structured Streaming
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
>> instead.
>>
>> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan 
>> wrote:
>>
>>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>>> It uses a 30 sec batch duration and normally the job is successful in
>>> 15-20 sec.
>>>
>>> If the spark application fails after the successful completion
>>> (149668428ms in the log below) and restarts, it's duplicating the last
>>> batch again.
>>>
>>> Is this the expected behavior? I was expecting this to start a new batch
>>> window.
>>>
>>>
>>> Here are some logs:
>>>
>>> Last successful run:
>>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>>> 149668428 ms (execution: 0.029 s)
>>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>>> 149668428 ms to writer queue
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>>> 149668428 ms saved to file
>>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>>> took 4032 bytes and 9 ms*
>>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>>> 149668428 ms
>>>
>>> After the restart,
>>>
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>>> 149668428 ms [(my_test,0,2000,2000)]
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms*
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>>> batches): *
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
>>> batches): *149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>>> 149668428 ms.0 from job set of time 149668428 ms
>>>
>>>
>>>
>>> --
>>> View this message in context: Fwd: Spark Streaming Checkpoint and
>>> Exactly Once Guarantee on Kafka Direct Stream
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
Thanks TD.
In pre-structured streaming, exactly once guarantee on input is not
guaranteed. is it?

On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das 
wrote:

> This is the expected behavior. There are some confusing corner cases.
> If you are starting to play with Spark Streaming, i highly recommend
> learning Structured Streaming
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
> instead.
>
> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan 
> wrote:
>
>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>> It uses a 30 sec batch duration and normally the job is successful in
>> 15-20 sec.
>>
>> If the spark application fails after the successful completion
>> (149668428ms in the log below) and restarts, it's duplicating the last
>> batch again.
>>
>> Is this the expected behavior? I was expecting this to start a new batch
>> window.
>>
>>
>> Here are some logs:
>>
>> Last successful run:
>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>> 149668428 ms (execution: 0.029 s)
>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>> 149668428 ms to writer queue
>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>> 149668428 ms saved to file
>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>> took 4032 bytes and 9 ms*
>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>> 149668428 ms
>>
>> After the restart,
>>
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>> 149668428 ms [(my_test,0,2000,2000)]
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>> 149668449 ms, 149668452 ms, 149668455 ms*
>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>> batches): *
>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches):
>> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms,
>> 149668440 ms, 1496684430000 ms, 149668446 ms, 149668449 ms,
>> 149668452 ms, 149668455 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>> 149668428 ms.0 from job set of time 149668428 ms
>>
>>
>>
>> --
>> View this message in context: Fwd: Spark Streaming Checkpoint and
>> Exactly Once Guarantee on Kafka Direct Stream
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
This is the expected behavior. There are some confusing corner cases.
If you are starting to play with Spark Streaming, i highly recommend
learning Structured Streaming
<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
instead.

On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan 
wrote:

> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
> It uses a 30 sec batch duration and normally the job is successful in
> 15-20 sec.
>
> If the spark application fails after the successful completion
> (149668428ms in the log below) and restarts, it's duplicating the last
> batch again.
>
> Is this the expected behavior? I was expecting this to start a new batch
> window.
>
>
> Here are some logs:
>
> Last successful run:
> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
> 149668428 ms (execution: 0.029 s)
> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
> 149668428 ms to writer queue
> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
> aProjects/Spark2Example/ckpt/checkpoint-149668428'
> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
> 149668428 ms saved to file
> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
> took 4032 bytes and 9 ms*
> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
> 149668428 ms
>
> After the restart,
>
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
> 149668428 ms [(my_test,0,2000,2000)]
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
> batches): 149668428 ms, 149668431 ms, 149668434 ms,
> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
> 149668449 ms, 149668452 ms, 149668455 ms*
> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
> batches): *
> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): 
> *149668428
> ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
> 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
> 149668455 ms
> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
> 149668428 ms.0 from job set of time 149668428 ms
>
>
>
> ------
> View this message in context: Fwd: Spark Streaming Checkpoint and Exactly
> Once Guarantee on Kafka Direct Stream
> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread anbucheeralan
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file 'file:/Users/anbucheeralan/
IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO DirectKafkaInputDStream$
DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms


Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Asher Krim
You should actually be able to get to the underlying filesystem from your
SparkContext:

String originalFs = sparkContext.hadoopConfiguration().get("fs.defaultFS");


and then you could just use that:

String checkpointPath = String.format("%s/%s/", originalFs,
checkpointDirectory);
sparkContext.setCheckpointDir(checkpointPath);


Asher Krim
Senior Software Engineer

On Tue, May 30, 2017 at 12:37 PM, Everett Anderson  wrote:

> Still haven't found a --conf option.
>
> Regarding a temporary HDFS checkpoint directory, it looks like when using
> --master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment
> variable. Thus, one could do the following when creating a SparkSession:
>
> val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"),
> "checkpoints").toString
> sparkSession.sparkContext.setCheckpointDir(checkpointPath)
>
> The staging directory is in an HDFS path like
>
> /user/[user]/.sparkStaging/[YARN application ID]
>
> and is deleted at the end of the application
> <https://github.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184>
> .
>
> So this is one option, though certainly abusing the staging directory.
>
> A more general one might be to find where Dataset.persist(DISK_ONLY)
> writes.
>
>
> On Fri, May 26, 2017 at 9:08 AM, Everett Anderson 
> wrote:
>
>> Hi,
>>
>> I need to set a checkpoint directory as I'm starting to use GraphFrames.
>> (Also, occasionally my regular DataFrame lineages get too long so it'd be
>> nice to use checkpointing to squash the lineage.)
>>
>> I don't actually need this checkpointed data to live beyond the life of
>> the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and
>> reading and writing non-transient data to S3.
>>
>> Two questions:
>>
>> 1. Is there a Spark --conf option to set the checkpoint directory?
>> Somehow I couldn't find it, but surely it exists.
>>
>> 2. What's a good checkpoint directory for this use case? I imagine it'd
>> be on HDFS and presumably in a YARN application-specific temporary path
>> that gets cleaned up afterwards. Does anyone have a recommendation?
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Everett Anderson
Still haven't found a --conf option.

Regarding a temporary HDFS checkpoint directory, it looks like when using
--master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment
variable. Thus, one could do the following when creating a SparkSession:

val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"),
"checkpoints").toString
sparkSession.sparkContext.setCheckpointDir(checkpointPath)

The staging directory is in an HDFS path like

/user/[user]/.sparkStaging/[YARN application ID]

and is deleted at the end of the application
<https://github.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184>
.

So this is one option, though certainly abusing the staging directory.

A more general one might be to find where Dataset.persist(DISK_ONLY) writes.


On Fri, May 26, 2017 at 9:08 AM, Everett Anderson  wrote:

> Hi,
>
> I need to set a checkpoint directory as I'm starting to use GraphFrames.
> (Also, occasionally my regular DataFrame lineages get too long so it'd be
> nice to use checkpointing to squash the lineage.)
>
> I don't actually need this checkpointed data to live beyond the life of
> the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and
> reading and writing non-transient data to S3.
>
> Two questions:
>
> 1. Is there a Spark --conf option to set the checkpoint directory? Somehow
> I couldn't find it, but surely it exists.
>
> 2. What's a good checkpoint directory for this use case? I imagine it'd be
> on HDFS and presumably in a YARN application-specific temporary path that
> gets cleaned up afterwards. Does anyone have a recommendation?
>
> Thanks!
>
> - Everett
>
>


Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-26 Thread Everett Anderson
Hi,

I need to set a checkpoint directory as I'm starting to use GraphFrames.
(Also, occasionally my regular DataFrame lineages get too long so it'd be
nice to use checkpointing to squash the lineage.)

I don't actually need this checkpointed data to live beyond the life of the
job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and reading
and writing non-transient data to S3.

Two questions:

1. Is there a Spark --conf option to set the checkpoint directory? Somehow
I couldn't find it, but surely it exists.

2. What's a good checkpoint directory for this use case? I imagine it'd be
on HDFS and presumably in a YARN application-specific temporary path that
gets cleaned up afterwards. Does anyone have a recommendation?

Thanks!

- Everett


Re: Spark checkpoint - nonstreaming

2017-05-26 Thread Jörn Franke
Just load it as from any other directory.

> On 26. May 2017, at 17:26, Priya PM  wrote:
> 
> 
> -- Forwarded message --
> From: Priya PM 
> Date: Fri, May 26, 2017 at 8:54 PM
> Subject: Re: Spark checkpoint - nonstreaming
> To: Jörn Franke 
> 
> 
> Oh, how do i do it. I dont see it mentioned anywhere in the documentation. 
> 
> I have followed this link 
> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>  to understand checkpoint work flow. 
> 
> But it doesnt seem to work the way it was mentioned below during the second 
> run to read from checkpointed RDD. 
> 
>  
> 
> Q: How to read checkpointed RDD ?
> 
> runJob() will call finalRDD.partitions() to determine how many tasks there 
> will be. rdd.partitions() checks if the RDD has been checkpointed via 
> RDDCheckpointData which manages checkpointed RDD. If yes, return the 
> partitions of the RDD (Array[Partition]). When rdd.iterator() is called to 
> compute RDD's partition, computeOrReadCheckpoint(split: Partition) is also 
> called to check if the RDD is checkpointed. If yes, the parent RDD's 
> iterator(), a.k.a CheckpointRDD.iterator() will be called. CheckpointRDD 
> reads files on file system to produce RDD partition. That's why a parent 
> CheckpointRDD is added to checkpointed rdd trickly
> 
> 
>> On Fri, May 26, 2017 at 8:48 PM, Jörn Franke  wrote:
>> Did you explicitly tell the application to read from the checkpoint 
>> directory ?
>> This you have to do in non-streaming scenarios.
>> 
>>> On 26. May 2017, at 16:52, Priya PM  wrote:
>>> 
>>> yes, i did set the checkpoint directory. I could see the checkpointed RDD 
>>> too. 
>>> 
>>> [root@ rdd-28]# pwd
>>> /root/checkpointDir/9dd1acf0-bef8-4a4f-bf0e-f7624334abc5/rdd-28
>>> 
>>> I am using the MovieLens application to check spark checkpointing feature. 
>>> 
>>> code: MovieLensALS.scala
>>> 
>>> def main(args: Array[String]) { 
>>> ..
>>> ..
>>> sc.setCheckpointDir("/root/checkpointDir")
>>> }
>>> 
>>> 
>>> 
>>>> On Fri, May 26, 2017 at 8:09 PM, Jörn Franke  wrote:
>>>> Do you have some source code?
>>>> Did you set the checkpoint directory ?
>>>> 
>>>> > On 26. May 2017, at 16:06, Priya  wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > With nonstreaming spark application, did checkpoint the RDD and I could 
>>>> > see
>>>> > the RDD getting checkpointed. I have killed the application after
>>>> > checkpointing the RDD and restarted the same application again 
>>>> > immediately,
>>>> > but it doesn't seem to pick from checkpoint and it again checkpoints the
>>>> > RDD. Could anyone please explain why am I seeing this behavior, why it is
>>>> > not picking from the checkpoint and proceeding further from there on the
>>>> > second run of the same application. Would really help me understand spark
>>>> > checkpoint work flow if I can get some clarity on the behavior. Please 
>>>> > let
>>>> > me know if I am missing something.
>>>> >
>>>> > [root@checkpointDir]# ls
>>>> > 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5  
>>>> > a4f14f43-e7c3-4f64-a980-8483b42bb11d
>>>> >
>>>> > [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la
>>>> > total 0
>>>> > drwxr-xr-x. 3 root root  20 May 26 16:26 .
>>>> > drwxr-xr-x. 4 root root  94 May 26 16:24 ..
>>>> > drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28
>>>> >
>>>> > [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/
>>>> > [root@priya-vm rdd-28]# ls
>>>> > part-0  part-1  _partitioner
>>>> >
>>>> > Thanks
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > View this message in context: 
>>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html
>>>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>> >
>>>> > -
>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> >
>>> 
> 
> 


Fwd: Spark checkpoint - nonstreaming

2017-05-26 Thread Priya PM
-- Forwarded message --
From: Priya PM 
Date: Fri, May 26, 2017 at 8:54 PM
Subject: Re: Spark checkpoint - nonstreaming
To: Jörn Franke 


Oh, how do i do it. I dont see it mentioned anywhere in the documentation.

I have followed this link https://github.com/JerryLead/SparkInternals/blob/
master/markdown/english/6-CacheAndCheckpoint.md to understand checkpoint
work flow.

But it doesnt seem to work the way it was mentioned below during the second
run to read from checkpointed RDD.



*Q: How to read checkpointed RDD ?*

runJob() will call finalRDD.partitions() to determine how many tasks there
will be. rdd.partitions() checks if the RDD has been checkpointed via
RDDCheckpointData which manages checkpointed RDD. If yes, return the
partitions of the RDD (Array[Partition]). When rdd.iterator() is called to
compute RDD's partition, computeOrReadCheckpoint(split: Partition) is also
called to check if the RDD is checkpointed. If yes, the parent RDD's
iterator(), a.k.a CheckpointRDD.iterator() will be called. CheckpointRDD
reads files on file system to produce RDD partition. *That's why a parent *
*CheckpointRDD** is added to checkpointed rdd trickly*

On Fri, May 26, 2017 at 8:48 PM, Jörn Franke  wrote:

> Did you explicitly tell the application to read from the checkpoint
> directory ?
> This you have to do in non-streaming scenarios.
>
> On 26. May 2017, at 16:52, Priya PM  wrote:
>
> yes, i did set the checkpoint directory. I could see the checkpointed RDD
> too.
>
> [root@ rdd-28]# pwd
> /root/checkpointDir/9dd1acf0-bef8-4a4f-bf0e-f7624334abc5/rdd-28
>
> I am using the MovieLens application to check spark checkpointing feature.
>
> code: MovieLensALS.scala
>
> def main(args: Array[String]) {
> ..
> ..
> sc.setCheckpointDir("/root/checkpointDir")
> }
>
>
>
> On Fri, May 26, 2017 at 8:09 PM, Jörn Franke  wrote:
>
>> Do you have some source code?
>> Did you set the checkpoint directory ?
>>
>> > On 26. May 2017, at 16:06, Priya  wrote:
>> >
>> > Hi,
>> >
>> > With nonstreaming spark application, did checkpoint the RDD and I could
>> see
>> > the RDD getting checkpointed. I have killed the application after
>> > checkpointing the RDD and restarted the same application again
>> immediately,
>> > but it doesn't seem to pick from checkpoint and it again checkpoints the
>> > RDD. Could anyone please explain why am I seeing this behavior, why it
>> is
>> > not picking from the checkpoint and proceeding further from there on the
>> > second run of the same application. Would really help me understand
>> spark
>> > checkpoint work flow if I can get some clarity on the behavior. Please
>> let
>> > me know if I am missing something.
>> >
>> > [root@checkpointDir]# ls
>> > 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5  a4f14f43-e7c3-4f64-a980-8483b4
>> 2bb11d
>> >
>> > [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la
>> > total 0
>> > drwxr-xr-x. 3 root root  20 May 26 16:26 .
>> > drwxr-xr-x. 4 root root  94 May 26 16:24 ..
>> > drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28
>> >
>> > [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/
>> > [root@priya-vm rdd-28]# ls
>> > part-0  part-1  _partitioner
>> >
>> > Thanks
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>


Re: Spark checkpoint - nonstreaming

2017-05-26 Thread Jörn Franke
Do you have some source code?
Did you set the checkpoint directory ?

> On 26. May 2017, at 16:06, Priya  wrote:
> 
> Hi,
> 
> With nonstreaming spark application, did checkpoint the RDD and I could see
> the RDD getting checkpointed. I have killed the application after
> checkpointing the RDD and restarted the same application again immediately,
> but it doesn't seem to pick from checkpoint and it again checkpoints the
> RDD. Could anyone please explain why am I seeing this behavior, why it is
> not picking from the checkpoint and proceeding further from there on the
> second run of the same application. Would really help me understand spark
> checkpoint work flow if I can get some clarity on the behavior. Please let
> me know if I am missing something. 
> 
> [root@checkpointDir]# ls
> 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5  a4f14f43-e7c3-4f64-a980-8483b42bb11d
> 
> [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la
> total 0
> drwxr-xr-x. 3 root root  20 May 26 16:26 .
> drwxr-xr-x. 4 root root  94 May 26 16:24 ..
> drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28
> 
> [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/
> [root@priya-vm rdd-28]# ls
> part-0  part-1  _partitioner
> 
> Thanks
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark checkpoint - nonstreaming

2017-05-26 Thread Holden Karau
In non streaming Spark checkpoints aren't for inter-application recovery,
rather you can think of them as doing persist but to a HDFS rather than
each nodes local memory / storage.


On Fri, May 26, 2017 at 3:06 PM Priya  wrote:

> Hi,
>
> With nonstreaming spark application, did checkpoint the RDD and I could see
> the RDD getting checkpointed. I have killed the application after
> checkpointing the RDD and restarted the same application again immediately,
> but it doesn't seem to pick from checkpoint and it again checkpoints the
> RDD. Could anyone please explain why am I seeing this behavior, why it is
> not picking from the checkpoint and proceeding further from there on the
> second run of the same application. Would really help me understand spark
> checkpoint work flow if I can get some clarity on the behavior. Please let
> me know if I am missing something.
>
> [root@checkpointDir]# ls
> 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5  a4f14f43-e7c3-4f64-a980-8483b42bb11d
>
> [root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la
> total 0
> drwxr-xr-x. 3 root root  20 May 26 16:26 .
> drwxr-xr-x. 4 root root  94 May 26 16:24 ..
> drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28
>
> [root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/
> [root@priya-vm rdd-28]# ls
> part-0  part-1  _partitioner
>
> Thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Spark checkpoint - nonstreaming

2017-05-26 Thread Priya
Hi,

With nonstreaming spark application, did checkpoint the RDD and I could see
the RDD getting checkpointed. I have killed the application after
checkpointing the RDD and restarted the same application again immediately,
but it doesn't seem to pick from checkpoint and it again checkpoints the
RDD. Could anyone please explain why am I seeing this behavior, why it is
not picking from the checkpoint and proceeding further from there on the
second run of the same application. Would really help me understand spark
checkpoint work flow if I can get some clarity on the behavior. Please let
me know if I am missing something. 

[root@checkpointDir]# ls
9dd1acf0-bef8-4a4f-bf0e-f7624334abc5  a4f14f43-e7c3-4f64-a980-8483b42bb11d

[root@9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# ls -la
total 0
drwxr-xr-x. 3 root root  20 May 26 16:26 .
drwxr-xr-x. 4 root root  94 May 26 16:24 ..
drwxr-xr-x. 2 root root 133 May 26 16:26 rdd-28

[root@priya-vm 9dd1acf0-bef8-4a4f-bf0e-f7624334abc5]# cd rdd-28/
[root@priya-vm rdd-28]# ls
part-0  part-1  _partitioner

Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoint-nonstreaming-tp28712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming: NullPointerException when restoring Spark Streaming job from hdfs/s3 checkpoint

2017-05-16 Thread Richard Moorhead

Im having some difficulty reliably restoring a streaming job from a checkpoint. 
When restoring a streaming job constructed from the following snippet, I 
receive NullPointerException's when `map` is called on the the restored RDD.


lazy val ssc = StreamingContext.getOrCreate(checkpointDir, 
createStreamingContext _)

private def createStreamingContext: StreamingContext = {
  val ssc = new StreamingContext(spark.sparkContext, batchInterval)
  ssc.checkpoint(checkpointDir)
  consumeStreamingContext(ssc)
  ssc
}

def consumeStreamingContext(ssc: StreamingContext) = {
  //... create dstreams
  val dstream = KinesisUtil.createStream(
  ...

  dstream.checkpoint(batchInterval)

  dstream
.foreachRDD(process)
}

def process(events: RDD[Event]) = {
  if (!events.isEmpty()) {
logger.info("Transforming events for processing")
//rdd seems to support some operations?
logger.info(s"RDD LENGTH: ${events.count}")
//nullpointer exception on call to .map
val df = events.map(e => {
  ...
}

  }
}




. . . . . . . . . . . . . . . . . . . . . . . . . . .

Richard Moorhead
Software Engineer
richard.moorh...@c2fo.com<mailto:richard.moorh...@gmail.com>

C2FO: The World's Market for Working Capital®

[http://c2fo.com/wp-content/uploads/sites/1/2016/03/LinkedIN.png] 
<https://www.linkedin.com/company/c2fo?trk=vsrp_companies_res_name&trkInfo=VSRPsearchId%3A125658601427902817660%2CVSRPtargetId%3A1555109%2CVSRPcmpt%3Aprimary>
 [http://c2fo.com/wp-content/uploads/sites/1/2016/03/YouTube.png]  
<https://www.youtube.com/c/C2FOMarket> 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Twitter.png]  
<https://twitter.com/C2FO> 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Googleplus.png]  
<https://plus.google.com/+C2foMarket/posts> 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Facebook.png]  
<https://www.facebook.com/C2FOMarketplace> 
[http://c2fo.com/wp-content/uploads/sites/1/2016/03/Forbes-Fintech-50.png] 
<https://c2fo.com/media-coverage/c2fo-included-forbes-fintech-50>

The information contained in this message and any attachment may be privileged, 
confidential, and protected from disclosure. If you are not the intended 
recipient, or an employee, or agent responsible for delivering this message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution, or copying of this communication is strictly prohibited. If you 
have received this communication in error, please notify us immediately by 
replying to the message and deleting from your computer.



checkpoint on spark standalone

2017-04-20 Thread Vivek Mishra
Hi,
I am processing multiple 2 GB each csv files with my spark application. Which 
also does union and aggregation across all the input files. Currently stuck 
with given below error:
java.lang.StackOverflowError
at 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.dataType(interfaces.scala:81)
at 
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:149)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$7.apply(basicOperators.scala:228)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$7.apply(basicOperators.scala:228)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:228)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at scala.Option.getOrElse(Option.scala:120)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:228)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$.unapply(patterns.scala:38)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:44)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:336)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:334)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:327)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)



I understand that it could be due to RDD linage and tried to resolve it via 
checkpoint, but no luck.

Any help?

Sincerely,
-Vivek










NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the

Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration

2017-04-14 Thread Katherin Eri
Thank you your reply, I will open pull request for this doc issue. The
logic is clear.

пт, 14 апр. 2017, 23:34 Michael Armbrust :

> 1)  could we update documentation for Structured Streaming and describe
>> that checkpointing could be specified by
>> spark.sql.streaming.checkpointLocation on SparkSession level and thus
>> automatically checkpoint dirs will be created per foreach query?
>>
>>
> Sure, please open a pull request.
>
>
>> 2) Do we really need to specify the checkpoint dir per query? what the
>> reason for this? finally we will be forced to write some checkpointDir name
>> generator, for example associate it with some particular named query and so
>> on?
>>
>
> Every query needs to have a unique checkpoint as this is how we track what
> has been processed.  If we don't have this, we can't restart the query
> where it left off.  In you example, I would suggest including the metric
> name in the checkpoint location path.
>
-- 

*Yours faithfully, *

*Kate Eri.*


Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration

2017-04-14 Thread Michael Armbrust
>
> 1)  could we update documentation for Structured Streaming and describe
> that checkpointing could be specified by 
> spark.sql.streaming.checkpointLocation
> on SparkSession level and thus automatically checkpoint dirs will be
> created per foreach query?
>
>
Sure, please open a pull request.


> 2) Do we really need to specify the checkpoint dir per query? what the
> reason for this? finally we will be forced to write some checkpointDir name
> generator, for example associate it with some particular named query and so
> on?
>

Every query needs to have a unique checkpoint as this is how we track what
has been processed.  If we don't have this, we can't restart the query
where it left off.  In you example, I would suggest including the metric
name in the checkpoint location path.


Re: checkpoint

2017-04-14 Thread Jean Georges Perrin
Sorry - can't help with PySpark, but here is some Java code which you may be 
able to transform to Python? 
http://jgp.net/2017/02/02/what-are-spark-checkpoints-on-dataframes/

jg


> On Apr 14, 2017, at 07:18, issues solution  wrote:
> 
> Hi 
> somone can give me an complete example to work with chekpoint under Pyspark 
> 1.6 ?
> 
> thx
> regards


checkpoint

2017-04-14 Thread issues solution
Hi
somone can give me an complete example to work with chekpoint under Pyspark
1.6 ?

thx
regards


SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration

2017-04-14 Thread Katherin Eri
Hello, guys.

I have initiated the ticket
https://issues.apache.org/jira/browse/SPARK-20325 ,

My case was: I launch two streams from one source stream *streamToProcess *like
this


streamToProcess

.groupBy(metric)

.agg(count(metric))

.writeStream

.outputMode("complete")

.option("checkpointLocation", checkpointDir)

.foreach(kafkaWriter)

.start()


After that I’ve got an exception:

Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as
another query with same id is already active. Perhaps you are attempting to
restart a query from checkpoint that is already active.


It is caused by that *StreamingQueryManager.scala* get the checkpoint dir
from stream’s configuration, and because my streams have equal
checkpointDirs, the second stream tries to recover instead of creating of
new one.For more details watch the ticket: SPARK-20325


1)  could we update documentation for Structured Streaming and describe
that checkpointing could be specified by
spark.sql.streaming.checkpointLocation on SparkSession level and thus
automatically checkpoint dirs will be created per foreach query?

2) Do we really need to specify the checkpoint dir per query? what the
reason for this? finally we will be forced to write some checkpointDir name
generator, for example associate it with some particular named query and so
on?

-- 

*Yours faithfully, *

*Kate Eri.*


checkpoint how to use correctly checkpoint with udf

2017-04-13 Thread issues solution
Hi ,
somone can  explain me how i can use inPYSPAK not in scala  chekpoint ,
Because i have lot of udf to apply on large data frame and i dont
understand how i can use checkpoint to break lineag to prevent from
java.lang.stackoverflow
regrads


Re: checkpoint

2017-04-13 Thread ayan guha
Looks like your udf expects numeric data but you are sending string type.
Suggest to cast to numeric.

On Thu, 13 Apr 2017 at 7:03 pm, issues solution 
wrote:

> Hi
> I am newer in spark and i want ask you what wrang with checkpoint  On
> pyspark 1.6.0
>
> i dont unertsand what happen after i try to use it under datframe :
>dfTotaleNormalize24 =  dfTotaleNormalize23.select([i if i not in
> listrapcot  else  udf_Grappra(F.col(i)).alias(i) for i in
> dfTotaleNormalize23.columns  ])
>
> dfTotaleNormalize24.cache()   <- cache on memory
> dfTotaleNormalize24.count <-matrialize dataframe(  rdd too ??)
> dfTotaleNormalize24.rdd.checkpoint() <- (cut DAG and save rdd not yet)
> dfTotaleNormalize24.rdd.count() <--- matrialize in file
>
> but why i get the following error :
>
>  java.lang.UnsupportedOperationException: Cannot evaluate expression:
>  PythonUDF#Grappra(input[410, StringType])
>
>
> thank's to explain all details and steps to save and check point
>
> Mydatframe it huge on with more than 5 Million rows and 1000 columns
>
> and udf befor are applied on more than 150 columns  it replace  ' ' by 0.0
> that all.
>
> regards
>
-- 
Best Regards,
Ayan Guha


checkpoint

2017-04-13 Thread issues solution
Hi
I am newer in spark and i want ask you what wrang with checkpoint  On
pyspark 1.6.0

i dont unertsand what happen after i try to use it under datframe :
   dfTotaleNormalize24 =  dfTotaleNormalize23.select([i if i not in
listrapcot  else  udf_Grappra(F.col(i)).alias(i) for i in
dfTotaleNormalize23.columns  ])

dfTotaleNormalize24.cache()   <- cache on memory
dfTotaleNormalize24.count <-matrialize dataframe(  rdd too ??)
dfTotaleNormalize24.rdd.checkpoint() <- (cut DAG and save rdd not yet)
dfTotaleNormalize24.rdd.count() <--- matrialize in file

but why i get the following error :

 java.lang.UnsupportedOperationException: Cannot evaluate expression:
 PythonUDF#Grappra(input[410, StringType])


thank's to explain all details and steps to save and check point

Mydatframe it huge on with more than 5 Million rows and 1000 columns

and udf befor are applied on more than 150 columns  it replace  ' ' by 0.0
that all.

regards


[Spark Streaming] Checkpoint backup (.bk) file purpose

2017-03-16 Thread Bartosz Konieczny
Hello,

Actually I'm studying metadata checkpoint implementation in Spark Streaming
and I was wondering the purpose of so called "backup files":

CheckpointWriter snippet:

> // We will do checkpoint when generating a batch and completing a batch.
> When the processing
> // time of a batch is greater than the batch interval, checkpointing for
> completing an old
> // batch may run after checkpointing of a new batch. If this happens,
> checkpoint of an old
> // batch actually has the latest information, so we want to recovery from
> it. Therefore, we
> // also use the latest checkpoint time as the file name, so that we can
> recover from the
> // latest checkpoint file.
> //
> // Note: there is only one thread writing the checkpoint files, so we
> don't need to worry
> // about thread-safety.
> val checkpointFile = Checkpoint.checkpointFile(checkpointDir,
> latestCheckpointTime)
> val backupFile = Checkpoint.checkpointBackupFile(checkpointDir,
> latestCheckpointTime)
>
> // ... some lines further
> // If the checkpoint file exists, back it up
> // If the backup exists as well, just delete it, otherwise rename will fail
> if (fs.exists(checkpointFile)) {
>   fs.delete(backupFile, true) // just in case it exists
>   if (!fs.rename(checkpointFile, backupFile)) {
> logWarning(s"Could not rename $checkpointFile to $backupFile")
>   }
> }
>

What is the role of this *backupFile* ? I understand that they are
generated if checkpoint file for given timestamp already exists. But how it
could be produced ? Is it a protection against checkpointing of different
Spark applications to the same directory ? Or it's adapted to case
described above (old batch terminated after new batch start) ?

Best regards,
Bartosz.


Re: Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-17 Thread shyla deshpande
Thanks TD and Marco for the feedback.

The directory referenced by SPARK_LOCAL_DIRS did not exist. After creating
that directory, it worked.

This was the first time I was trying to run spark on standalone cluster, so
I missed it.

Thanks

On Fri, Feb 17, 2017 at 12:35 PM, Tathagata Das  wrote:

> Seems like an issue with the HDFS you are using for checkpointing. Its not
> able to write data properly.
>
> On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande  > wrote:
>
>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> File 
>> /checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3
>> could only be replicated to 0 nodes instead of minReplication (=1).  There
>> are 0 datanode(s) running and no node(s) are excluded in this operation.
>>
>> This is the error I get when I run my spark streaming app on 2 node EC2
>> cluster, with 1 master and 1 worker.
>>
>> Works fine in local mode. Please help.
>>
>> Thanks
>>
>
>


  1   2   3   4   5   >