Re: structured streaming- checkpoint metadata growing indefinetely

2022-05-04 Thread Wojciech Indyk
For posterity: the problem was FileStreamSourceLog class. I needed to
overwrite method shouldRetain, that by default returns true and its doc say:
Default implementation retains all log entries. Implementations should
override the method to change the behavior.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


sob., 30 kwi 2022 o 12:35 Wojciech Indyk 
napisał(a):

> Hi Gourav!
> I use stateless processing, no watermarking, no aggregations.
> I don't want any data loss, so changing checkpoint location is not an
> option to me.
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>
>
> pt., 29 kwi 2022 o 11:07 Gourav Sengupta 
> napisał(a):
>
>> Hi,
>>
>> this may not solve the problem, but have you tried to stop the job
>> gracefully, and then restart without much delay by pointing to a new
>> checkpoint location? The approach will have certain uncertainties for
>> scenarios where the source system can loose data, or we do not expect
>> duplicates to be committed, etc.
>>
>> It will be good to know what kind of processing you are doing as well.
>>
>>
>> Regards,
>> Gourav
>>
>> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk 
>> wrote:
>>
>>> Update for the scenario of deleting compact files: it recovers from the
>>> recent (not compacted) checkpoint file, but when it comes to compaction of
>>> checkpoint then it fails with missing recent compaction file. I use Spark
>>> 3.1.2
>>>
>>> --
>>> Kind regards/ Pozdrawiam,
>>> Wojciech Indyk
>>>
>>>
>>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
>>> napisał(a):
>>>
>>>> Hello!
>>>> I use spark struture streaming. I need to use s3 for storing checkpoint
>>>> metadata (I know, it's not optimal storage for checkpoint metadata).
>>>> Compaction interval is 10 (default) and I set
>>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>>>> few weeks then checkpointing time increased significantly (cause a few
>>>> minutes dalay on processing). I looked at checkpoint metadata structure.
>>>> There is one heavy path there: checkpoint/source/0. Single .compact file
>>>> weights 25GB. I looked into its content and it contains all entries since
>>>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>>>> already processed data from the compact file, namely:
>>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not
>>>> work. As I've seen in the code it's related to previous version of
>>>> streaming, isn't it?
>>>> "spark.sql.streaming.fileSource.log.deletion"=true and
>>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>>>> The compact file store full history even if all data were processed
>>>> (except for the most recent checkpoint), so I expect most of entries would
>>>> be deleted. Is there any parameter to remove entries from compact file or
>>>> remove compact file gracefully from time to time? Now I am testing scenario
>>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>>>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>>>> recovers correctly from recent checkpoint. It looks like possible
>>>> workaround of my problem, but this scenario with manual delete of
>>>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>>>
>>>> --
>>>> Kind regards/ Pozdrawiam,
>>>> Wojciech Indyk
>>>>
>>>


Re: structured streaming- checkpoint metadata growing indefinetely

2022-04-30 Thread Wojciech Indyk
Hi Gourav!
I use stateless processing, no watermarking, no aggregations.
I don't want any data loss, so changing checkpoint location is not an
option to me.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


pt., 29 kwi 2022 o 11:07 Gourav Sengupta 
napisał(a):

> Hi,
>
> this may not solve the problem, but have you tried to stop the job
> gracefully, and then restart without much delay by pointing to a new
> checkpoint location? The approach will have certain uncertainties for
> scenarios where the source system can loose data, or we do not expect
> duplicates to be committed, etc.
>
> It will be good to know what kind of processing you are doing as well.
>
>
> Regards,
> Gourav
>
> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk 
> wrote:
>
>> Update for the scenario of deleting compact files: it recovers from the
>> recent (not compacted) checkpoint file, but when it comes to compaction of
>> checkpoint then it fails with missing recent compaction file. I use Spark
>> 3.1.2
>>
>> --
>> Kind regards/ Pozdrawiam,
>> Wojciech Indyk
>>
>>
>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
>> napisał(a):
>>
>>> Hello!
>>> I use spark struture streaming. I need to use s3 for storing checkpoint
>>> metadata (I know, it's not optimal storage for checkpoint metadata).
>>> Compaction interval is 10 (default) and I set
>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>>> few weeks then checkpointing time increased significantly (cause a few
>>> minutes dalay on processing). I looked at checkpoint metadata structure.
>>> There is one heavy path there: checkpoint/source/0. Single .compact file
>>> weights 25GB. I looked into its content and it contains all entries since
>>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>>> already processed data from the compact file, namely:
>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work.
>>> As I've seen in the code it's related to previous version of streaming,
>>> isn't it?
>>> "spark.sql.streaming.fileSource.log.deletion"=true and
>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>>> The compact file store full history even if all data were processed
>>> (except for the most recent checkpoint), so I expect most of entries would
>>> be deleted. Is there any parameter to remove entries from compact file or
>>> remove compact file gracefully from time to time? Now I am testing scenario
>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>>> recovers correctly from recent checkpoint. It looks like possible
>>> workaround of my problem, but this scenario with manual delete of
>>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>>
>>> --
>>> Kind regards/ Pozdrawiam,
>>> Wojciech Indyk
>>>
>>


Re: structured streaming- checkpoint metadata growing indefinetely

2022-04-29 Thread Wojciech Indyk
Update for the scenario of deleting compact files: it recovers from the
recent (not compacted) checkpoint file, but when it comes to compaction of
checkpoint then it fails with missing recent compaction file. I use Spark
3.1.2

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
napisał(a):

> Hello!
> I use spark struture streaming. I need to use s3 for storing checkpoint
> metadata (I know, it's not optimal storage for checkpoint metadata).
> Compaction interval is 10 (default) and I set
> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
> few weeks then checkpointing time increased significantly (cause a few
> minutes dalay on processing). I looked at checkpoint metadata structure.
> There is one heavy path there: checkpoint/source/0. Single .compact file
> weights 25GB. I looked into its content and it contains all entries since
> batch 0 (current batch is around 25000). I tried a few parameters to remove
> already processed data from the compact file, namely:
> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work.
> As I've seen in the code it's related to previous version of streaming,
> isn't it?
> "spark.sql.streaming.fileSource.log.deletion"=true and
> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
> The compact file store full history even if all data were processed
> (except for the most recent checkpoint), so I expect most of entries would
> be deleted. Is there any parameter to remove entries from compact file or
> remove compact file gracefully from time to time? Now I am testing scenario
> when I stop the job, delete most of checkpoint/source/0/* files, keeping
> just a few recent checkpoints (not compacted) and I rerun the job. The job
> recovers correctly from recent checkpoint. It looks like possible
> workaround of my problem, but this scenario with manual delete of
> checkpoint files looks ugly, so I would prefer something managed by Spark.
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>


structured streaming- checkpoint metadata growing indefinetely

2022-04-28 Thread Wojciech Indyk
Hello!
I use spark struture streaming. I need to use s3 for storing checkpoint
metadata (I know, it's not optimal storage for checkpoint metadata).
Compaction interval is 10 (default) and I set
"spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
few weeks then checkpointing time increased significantly (cause a few
minutes dalay on processing). I looked at checkpoint metadata structure.
There is one heavy path there: checkpoint/source/0. Single .compact file
weights 25GB. I looked into its content and it contains all entries since
batch 0 (current batch is around 25000). I tried a few parameters to remove
already processed data from the compact file, namely:
"spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. As
I've seen in the code it's related to previous version of streaming, isn't
it?
"spark.sql.streaming.fileSource.log.deletion"=true and
"spark.sql.streaming.fileSink.log.deletion"=true doesn't work
The compact file store full history even if all data were processed (except
for the most recent checkpoint), so I expect most of entries would be
deleted. Is there any parameter to remove entries from compact file or
remove compact file gracefully from time to time? Now I am testing scenario
when I stop the job, delete most of checkpoint/source/0/* files, keeping
just a few recent checkpoints (not compacted) and I rerun the job. The job
recovers correctly from recent checkpoint. It looks like possible
workaround of my problem, but this scenario with manual delete of
checkpoint files looks ugly, so I would prefer something managed by Spark.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


outdated documentation? SparkSession

2017-01-27 Thread Wojciech Indyk
Hi!
In this doc
http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark
initialization is described by SparkContext. Do you think is it reasonable
to change it to SparkSession or just mentioned it at the end? I can prepare
it and make PR for this, but want to know your opinion at first. The same
for quickstart:
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


Re: [ERROR]: Spark 1.5.2 + Hbase 1.1 + Hive 1.2 + HbaseIntegration

2016-04-08 Thread Wojciech Indyk
Hello Divya!
Have you solved the problem?
I suppose the log comes from driver. You need to look also at logs on
worker JVMs, there can be an exception or something.
Do you have Kerberos on your cluster? It could be similar to a problem
http://issues.apache.org/jira/browse/SPARK-14115

Based on your logs:
> 16/02/29 23:09:34 INFO ClientCnxn: Opening socket connection to server
> localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL
> (unknown error)
> 16/02/29 23:09:34 INFO ClientCnxn: Socket connection established to
> localhost/0:0:0:0:0:0:0:1:2181, initiating session
> 16/02/29 23:09:34 INFO ClientCnxn: Session establishment complete on
> server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x3532fb70ba20035,

Maybe there is a problem with using RPC call to regions using IPv6
(but I just guess).

--
Kind regards/ Pozdrawiam,
Wojciech Indyk
http://datacentric.pl


2016-03-01 5:27 GMT+01:00 Divya Gehlot <divya.htco...@gmail.com>:
> Hi,
> I am getting error when I am trying to connect hive table (which is being
> created through HbaseIntegration) in spark
>
> Steps I followed :
> *Hive Table creation code  *:
> CREATE EXTERNAL TABLE IF NOT EXISTS TEST(NAME STRING,AGE INT)
> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,0:AGE")
> TBLPROPERTIES ("hbase.table.name" = "TEST",
> "hbase.mapred.output.outputtable" = "TEST");
>
>
> *DESCRIBE TEST ;*
> col_namedata_typecomment
> namestring from deserializer
> age   int from deserializer
>
>
> *Spark Code :*
> import org.apache.spark._
> import org.apache.spark.sql._
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("from TEST SELECT  NAME").collect.foreach(println)
>
>
> *Starting Spark shell*
> spark-shell --jars
> /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
> --driver-class-path
> /usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
> --packages com.databricks:spark-csv_2.10:1.3.0  --master yarn-client -i
> /TestDivya/Spark/InstrumentCopyToHDFSHive.scala
>
> *Stack Trace* :
>
> Stack SQL context available as sqlContext.
>> Loading /TestDivya/Spark/InstrumentCopyToHDFSHive.scala...
>> import org.apache.spark._
>> import org.apache.spark.sql._
>> 16/02/29 23:09:29 INFO HiveContext: Initializing execution hive, version
>> 1.2.1
>> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:29 INFO HiveContext: default warehouse location is
>> /user/hive/warehouse
>> 16/02/29 23:09:29 INFO HiveContext: Initializing HiveMetastoreConnection
>> version 1.2.1 using Spark classes.
>> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
>> 2.7.1.2.3.4.0-3485
>> 16/02/29 23:09:30 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 16/02/29 23:09:30 INFO metastore: Trying to connect to metastore with URI
>> thrift://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:9083
>> 16/02/29 23:09:30 INFO metastore: Connected to metastore.
>> 16/02/29 23:09:30 WARN DomainSocketFactory: The short-circuit local reads
>> feature cannot be used because libhadoop cannot be loaded.
>> 16/02/29 23:09:31 INFO SessionState: Created local directory:
>> /tmp/1bf53785-f7c8-406d-a733-a5858ccb2d16_resources
&g