Re: spark session jdbc performance

2017-10-25 Thread Gourav Sengupta
Hi Naveen,

Can you please copy and paste the lines in your original email again, and
perhaps then Lucas can go through it completely & kindly stop thinking that
others are responding by assuming things?

On other hand, please try to let me know how things are going on, there is
another post on this a few weeks back and several other users are equally
finding this issue very interesting to resolve.

I might just have the solution for this.

Regards,
Gourav Sengupta

On Wed, Oct 25, 2017 at 9:26 PM, lucas.g...@gmail.com 
wrote:

> Are we seeing the UI is showing only one partition to run the query?  The
> original poster hasn't replied yet.
>
> My assumption is that there's only one executor configured / deployed.
> But we only know what the OP stated which wasn't enough to be sure of
> anything.
>
> Why are you suggesting that partitioning on the PK isn't prudent? and did
> you mean to say that 30 partitions were far to many for any system to
> handle?  (I'm assuming you misread the original code)
>
> Gary
>
> On 25 October 2017 at 13:21, Gourav Sengupta 
> wrote:
>
>> Hi Lucas,
>>
>> so if I am assuming things, can you please explain why the UI is showing
>> only one partition to run the query?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Wed, Oct 25, 2017 at 6:03 PM, lucas.g...@gmail.com <
>> lucas.g...@gmail.com> wrote:
>>
>>> Gourav, I'm assuming you misread the code.  It's 30 partitions, which
>>> isn't a ridiculous value.  Maybe you misread the upperBound for the
>>> partitions?  (That would be ridiculous)
>>>
>>> Why not use the PK as the partition column?  Obviously it depends on the
>>> downstream queries.  If you're going to be performing joins (which I assume
>>> is the case) then partitioning on the join column would be advisable, but
>>> what about the case where the join column would be heavily skewed?
>>>
>>> Thanks!
>>>
>>> Gary
>>>
>>> On 24 October 2017 at 23:41, Gourav Sengupta 
>>> wrote:
>>>
 Hi Naveen,

 I do not think that it is prudent to use the PK as the partitionColumn.
 That is too many partitions for any system to handle. The numPartitions
 will be valid in case of JDBC very differently.

 Please keep me updated on how things go.


 Regards,
 Gourav Sengupta

 On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire  wrote:

>
> Hi,
>
>
>
> I am trying to fetch data from Oracle DB using a subquery and
> experiencing lot of performance issues.
>
>
>
> Below is the query I am using,
>
>
>
> *Using Spark 2.0.2*
>
>
>
> *val *df = spark_session.read.format(*"jdbc"*)
> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
> .option(*"url"*, jdbc_url)
>.option(*"user"*, user)
>.option(*"password"*, pwd)
>.option(*"dbtable"*, *"subquery"*)
>.option(*"partitionColumn"*, *"id"*)  //primary key column
> uniformly distributed
>.option(*"lowerBound"*, *"1"*)
>.option(*"upperBound"*, *"50"*)
> .option(*"numPartitions"*, 30)
> .load()
>
>
>
> The above query is running using the 30 partitions, but when I see the
> UI it is only using 1 partiton to run the query.
>
>
>
> Can anyone tell if I am missing anything or do I need to anything else
> to tune the performance of the query.
>
>  *Thanks*
>


>>>
>>
>


Re: Spark streaming for CEP

2017-10-25 Thread anna stax
Thanks very much  Mich, Thomas and Stephan . I will look into it.

On Tue, Oct 24, 2017 at 8:02 PM, lucas.g...@gmail.com 
wrote:

> This looks really interesting, thanks for linking!
>
> Gary Lucas
>
> On 24 October 2017 at 15:06, Mich Talebzadeh 
> wrote:
>
>> Great thanks Steve
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 24 October 2017 at 22:58, Stephen Boesch  wrote:
>>
>>> Hi Mich, the github link has a brief intro - including a link to the
>>> formal docs http://logisland.readthedocs.io/en/latest/index.html .
>>>  They have an architectural overview, developer guide, tutorial, and pretty
>>> comprehensive api docs.
>>>
>>> 2017-10-24 13:31 GMT-07:00 Mich Talebzadeh :
>>>
 thanks Thomas.

 do you have a summary write-up for this tool please?


 regards,




 Thomas

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 24 October 2017 at 13:53, Thomas Bailet 
 wrote:

> Hi
>
> we (@ hurence) have released on open source middleware based on
> SparkStreaming over Kafka to do CEP and log mining, called *logisland*
> (https://github.com/Hurence/logisland/) it has been deployed into
> production for 2 years now and does a great job. You should have a look.
>
>
> bye
>
> Thomas Bailet
>
> CTO : hurence
>
> Le 18/10/17 à 22:05, Mich Talebzadeh a écrit :
>
> As you may be aware the granularity that Spark streaming has is
> micro-batching and that is limited to 0.5 second. So if you have 
> continuous
> ingestion of data then Spark streaming may not be granular enough for CEP.
> You may consider other products.
>
> Worth looking at this old thread on mine "Spark support for Complex
> Event Processing (CEP)
>
> https://mail-archives.apache.org/mod_mbox/spark-user/201604.
> mbox/%3CCAJ3fcbB8eaf0JV84bA7XGUK5GajC1yGT3ZgTNCi8arJg56=LbQ@
> mail.gmail.com%3E
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 18 October 2017 at 20:52, anna stax  wrote:
>
>> Hello all,
>>
>> Has anyone used spark streaming for CEP (Complex Event processing).
>> Any CEP libraries that works well with spark. I have a use case for CEP 
>> and
>> trying to see if spark streaming is a good fit.
>>
>> Currently we have a data pipeline using Kafka, Spark streaming and
>> Cassandra for data ingestion and near real time dashboard.
>>
>> Please share your experience.
>> Thanks much.
>> -Anna
>>
>>
>>
>
>

>>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread Tathagata Das
Please do not confuse old Spark Streaming (DStreams) with Structured
Streaming. Structured Streaming's offset and checkpoint management is far
more robust than DStreams.
Take a look at my talk -
https://spark-summit.org/2017/speakers/tathagata-das/

On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Thanks Subhash.
>
> Have you ever used zero data loss concept with streaming. I am bit worried
> to use streamig when it comes to data loss.
>
> https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-
> with-apache-spark-streaming/
>
>
> does structured streaming handles it internally?
>
> On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram 
> wrote:
>
>> No problem! Take a look at this:
>>
>> http://spark.apache.org/docs/latest/structured-streaming-pro
>> gramming-guide.html#recovering-from-failures-with-checkpointing
>>
>> Thanks,
>> Subhash
>>
>> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Hi Sriram,
>>>
>>> Thanks. This is what I was looking for.
>>>
>>> one question, where do we need to specify the checkpoint directory in
>>> case of structured streaming?
>>>
>>> Thanks,
>>> Asmath
>>>
>>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <
>>> subhash.sri...@gmail.com> wrote:
>>>
 Hi Asmath,

 Here is an example of using structured streaming to read from Kafka:

 https://github.com/apache/spark/blob/master/examples/src/mai
 n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
 fkaWordCount.scala

 In terms of parsing the JSON, there is a from_json function that you
 can use. The following might help:

 https://databricks.com/blog/2017/02/23/working-complex-data-
 formats-structured-streaming-apache-spark-2-1.html

 I hope this helps.

 Thanks,
 Subhash

 On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
 mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> Could anyone provide suggestions on how to parse json data from kafka
> and load it back in hive.
>
> I have read about structured streaming but didn't find any examples.
> is there any best practise on how to read it and parse it with structured
> streaming for this use case?
>
> Thanks,
> Asmath
>


>>>
>>
>


Re: Dataset API Question

2017-10-25 Thread Wenchen Fan
It's because of different API design.

*RDD.checkpoint* returns void, which means it mutates the RDD state so you
need a *RDD**.isCheckpointed* method to check if this RDD is checkpointed.

*Dataset.checkpoint* returns a new Dataset, which means there is no
isCheckpointed state in Dataset, and thus we don't need a
*Dataset.isCheckpointed* method.


On Wed, Oct 25, 2017 at 6:39 PM, Bernard Jesop 
wrote:

> Actually, I realized keeping the info would not be enough as I need to
> find back the checkpoint files to delete them :/
>
> 2017-10-25 19:07 GMT+02:00 Bernard Jesop :
>
>> As far as I understand, Dataset.rdd is not the same as InternalRDD.
>> It is just another RDD representation of the same Dataset and is created
>> on demand (lazy val) when Dataset.rdd is called.
>> This totally explains the observed behavior.
>>
>> But how would would it be possible to know that a Dataset have been
>> checkpointed?
>> Should I manually keep track of that info?
>>
>> 2017-10-25 15:51 GMT+02:00 Bernard Jesop :
>>
>>> Hello everyone,
>>>
>>> I have a question about checkpointing on dataset.
>>>
>>> It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike
>>> RDD there is no Dataset.isCheckpointed().
>>>
>>> I wonder if Dataset.checkpoint is a syntactic sugar for
>>> Dataset.rdd.checkpoint.
>>> When I do :
>>>
>>> Dataset.checkpoint; Dataset.count
>>> Dataset.rdd.isCheckpointed // result: false
>>>
>>> However, when I explicitly do:
>>> Dataset.rdd.checkpoint; Dataset.rdd.count
>>> Dataset.rdd.isCheckpointed // result: true
>>>
>>> Could someone explain this behavior to me, or provide some references?
>>>
>>> Best regards,
>>> Bernard
>>>
>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread KhajaAsmath Mohammed
Thanks Subhash.

Have you ever used zero data loss concept with streaming. I am bit worried
to use streamig when it comes to data loss.

https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/


does structured streaming handles it internally?

On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram 
wrote:

> No problem! Take a look at this:
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#recovering-from-failures-with-checkpointing
>
> Thanks,
> Subhash
>
> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi Sriram,
>>
>> Thanks. This is what I was looking for.
>>
>> one question, where do we need to specify the checkpoint directory in
>> case of structured streaming?
>>
>> Thanks,
>> Asmath
>>
>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram > > wrote:
>>
>>> Hi Asmath,
>>>
>>> Here is an example of using structured streaming to read from Kafka:
>>>
>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>> fkaWordCount.scala
>>>
>>> In terms of parsing the JSON, there is a from_json function that you can
>>> use. The following might help:
>>>
>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>> formats-structured-streaming-apache-spark-2-1.html
>>>
>>> I hope this helps.
>>>
>>> Thanks,
>>> Subhash
>>>
>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
 Hi,

 Could anyone provide suggestions on how to parse json data from kafka
 and load it back in hive.

 I have read about structured streaming but didn't find any examples. is
 there any best practise on how to read it and parse it with structured
 streaming for this use case?

 Thanks,
 Asmath

>>>
>>>
>>
>


Re: spark session jdbc performance

2017-10-25 Thread lucas.g...@gmail.com
Are we seeing the UI is showing only one partition to run the query?  The
original poster hasn't replied yet.

My assumption is that there's only one executor configured / deployed.  But
we only know what the OP stated which wasn't enough to be sure of anything.

Why are you suggesting that partitioning on the PK isn't prudent? and did
you mean to say that 30 partitions were far to many for any system to
handle?  (I'm assuming you misread the original code)

Gary

On 25 October 2017 at 13:21, Gourav Sengupta 
wrote:

> Hi Lucas,
>
> so if I am assuming things, can you please explain why the UI is showing
> only one partition to run the query?
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Oct 25, 2017 at 6:03 PM, lucas.g...@gmail.com <
> lucas.g...@gmail.com> wrote:
>
>> Gourav, I'm assuming you misread the code.  It's 30 partitions, which
>> isn't a ridiculous value.  Maybe you misread the upperBound for the
>> partitions?  (That would be ridiculous)
>>
>> Why not use the PK as the partition column?  Obviously it depends on the
>> downstream queries.  If you're going to be performing joins (which I assume
>> is the case) then partitioning on the join column would be advisable, but
>> what about the case where the join column would be heavily skewed?
>>
>> Thanks!
>>
>> Gary
>>
>> On 24 October 2017 at 23:41, Gourav Sengupta 
>> wrote:
>>
>>> Hi Naveen,
>>>
>>> I do not think that it is prudent to use the PK as the partitionColumn.
>>> That is too many partitions for any system to handle. The numPartitions
>>> will be valid in case of JDBC very differently.
>>>
>>> Please keep me updated on how things go.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire 
>>> wrote:
>>>

 Hi,



 I am trying to fetch data from Oracle DB using a subquery and
 experiencing lot of performance issues.



 Below is the query I am using,



 *Using Spark 2.0.2*



 *val *df = spark_session.read.format(*"jdbc"*)
 .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
 .option(*"url"*, jdbc_url)
.option(*"user"*, user)
.option(*"password"*, pwd)
.option(*"dbtable"*, *"subquery"*)
.option(*"partitionColumn"*, *"id"*)  //primary key column
 uniformly distributed
.option(*"lowerBound"*, *"1"*)
.option(*"upperBound"*, *"50"*)
 .option(*"numPartitions"*, 30)
 .load()



 The above query is running using the 30 partitions, but when I see the
 UI it is only using 1 partiton to run the query.



 Can anyone tell if I am missing anything or do I need to anything else
 to tune the performance of the query.

  *Thanks*

>>>
>>>
>>
>


Re: spark session jdbc performance

2017-10-25 Thread Gourav Sengupta
Hi Lucas,

so if I am assuming things, can you please explain why the UI is showing
only one partition to run the query?


Regards,
Gourav Sengupta

On Wed, Oct 25, 2017 at 6:03 PM, lucas.g...@gmail.com 
wrote:

> Gourav, I'm assuming you misread the code.  It's 30 partitions, which
> isn't a ridiculous value.  Maybe you misread the upperBound for the
> partitions?  (That would be ridiculous)
>
> Why not use the PK as the partition column?  Obviously it depends on the
> downstream queries.  If you're going to be performing joins (which I assume
> is the case) then partitioning on the join column would be advisable, but
> what about the case where the join column would be heavily skewed?
>
> Thanks!
>
> Gary
>
> On 24 October 2017 at 23:41, Gourav Sengupta 
> wrote:
>
>> Hi Naveen,
>>
>> I do not think that it is prudent to use the PK as the partitionColumn.
>> That is too many partitions for any system to handle. The numPartitions
>> will be valid in case of JDBC very differently.
>>
>> Please keep me updated on how things go.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I am trying to fetch data from Oracle DB using a subquery and
>>> experiencing lot of performance issues.
>>>
>>>
>>>
>>> Below is the query I am using,
>>>
>>>
>>>
>>> *Using Spark 2.0.2*
>>>
>>>
>>>
>>> *val *df = spark_session.read.format(*"jdbc"*)
>>> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
>>> .option(*"url"*, jdbc_url)
>>>.option(*"user"*, user)
>>>.option(*"password"*, pwd)
>>>.option(*"dbtable"*, *"subquery"*)
>>>.option(*"partitionColumn"*, *"id"*)  //primary key column uniformly
>>> distributed
>>>.option(*"lowerBound"*, *"1"*)
>>>.option(*"upperBound"*, *"50"*)
>>> .option(*"numPartitions"*, 30)
>>> .load()
>>>
>>>
>>>
>>> The above query is running using the 30 partitions, but when I see the
>>> UI it is only using 1 partiton to run the query.
>>>
>>>
>>>
>>> Can anyone tell if I am missing anything or do I need to anything else
>>> to tune the performance of the query.
>>>
>>>  *Thanks*
>>>
>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread Subhash Sriram
No problem! Take a look at this:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

Thanks,
Subhash

On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi Sriram,
>
> Thanks. This is what I was looking for.
>
> one question, where do we need to specify the checkpoint directory in case
> of structured streaming?
>
> Thanks,
> Asmath
>
> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram 
> wrote:
>
>> Hi Asmath,
>>
>> Here is an example of using structured streaming to read from Kafka:
>>
>> https://github.com/apache/spark/blob/master/examples/src/
>> main/scala/org/apache/spark/examples/sql/streaming/Structu
>> redKafkaWordCount.scala
>>
>> In terms of parsing the JSON, there is a from_json function that you can
>> use. The following might help:
>>
>> https://databricks.com/blog/2017/02/23/working-complex-data-
>> formats-structured-streaming-apache-spark-2-1.html
>>
>> I hope this helps.
>>
>> Thanks,
>> Subhash
>>
>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Could anyone provide suggestions on how to parse json data from kafka
>>> and load it back in hive.
>>>
>>> I have read about structured streaming but didn't find any examples. is
>>> there any best practise on how to read it and parse it with structured
>>> streaming for this use case?
>>>
>>> Thanks,
>>> Asmath
>>>
>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread KhajaAsmath Mohammed
Hi Sriram,

Thanks. This is what I was looking for.

one question, where do we need to specify the checkpoint directory in case
of structured streaming?

Thanks,
Asmath

On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram 
wrote:

> Hi Asmath,
>
> Here is an example of using structured streaming to read from Kafka:
>
> https://github.com/apache/spark/blob/master/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredKafkaWordCount.scala
>
> In terms of parsing the JSON, there is a from_json function that you can
> use. The following might help:
>
> https://databricks.com/blog/2017/02/23/working-complex-
> data-formats-structured-streaming-apache-spark-2-1.html
>
> I hope this helps.
>
> Thanks,
> Subhash
>
> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> Could anyone provide suggestions on how to parse json data from kafka and
>> load it back in hive.
>>
>> I have read about structured streaming but didn't find any examples. is
>> there any best practise on how to read it and parse it with structured
>> streaming for this use case?
>>
>> Thanks,
>> Asmath
>>
>
>


Re: Structured Stream in Spark

2017-10-25 Thread Subhash Sriram
Hi Asmath,

Here is an example of using structured streaming to read from Kafka:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala

In terms of parsing the JSON, there is a from_json function that you can
use. The following might help:

https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

I hope this helps.

Thanks,
Subhash

On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> Could anyone provide suggestions on how to parse json data from kafka and
> load it back in hive.
>
> I have read about structured streaming but didn't find any examples. is
> there any best practise on how to read it and parse it with structured
> streaming for this use case?
>
> Thanks,
> Asmath
>


Structured Stream in Spark

2017-10-25 Thread KhajaAsmath Mohammed
Hi,

Could anyone provide suggestions on how to parse json data from kafka and
load it back in hive.

I have read about structured streaming but didn't find any examples. is
there any best practise on how to read it and parse it with structured
streaming for this use case?

Thanks,
Asmath


Re: Dataset API Question

2017-10-25 Thread Bernard Jesop
Actually, I realized keeping the info would not be enough as I need to find
back the checkpoint files to delete them :/

2017-10-25 19:07 GMT+02:00 Bernard Jesop :

> As far as I understand, Dataset.rdd is not the same as InternalRDD.
> It is just another RDD representation of the same Dataset and is created
> on demand (lazy val) when Dataset.rdd is called.
> This totally explains the observed behavior.
>
> But how would would it be possible to know that a Dataset have been
> checkpointed?
> Should I manually keep track of that info?
>
> 2017-10-25 15:51 GMT+02:00 Bernard Jesop :
>
>> Hello everyone,
>>
>> I have a question about checkpointing on dataset.
>>
>> It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike
>> RDD there is no Dataset.isCheckpointed().
>>
>> I wonder if Dataset.checkpoint is a syntactic sugar for
>> Dataset.rdd.checkpoint.
>> When I do :
>>
>> Dataset.checkpoint; Dataset.count
>> Dataset.rdd.isCheckpointed // result: false
>>
>> However, when I explicitly do:
>> Dataset.rdd.checkpoint; Dataset.rdd.count
>> Dataset.rdd.isCheckpointed // result: true
>>
>> Could someone explain this behavior to me, or provide some references?
>>
>> Best regards,
>> Bernard
>>
>
>


Re: Dataset API Question

2017-10-25 Thread Bernard Jesop
As far as I understand, Dataset.rdd is not the same as InternalRDD.
It is just another RDD representation of the same Dataset and is created on
demand (lazy val) when Dataset.rdd is called.
This totally explains the observed behavior.

But how would would it be possible to know that a Dataset have been
checkpointed?
Should I manually keep track of that info?

2017-10-25 15:51 GMT+02:00 Bernard Jesop :

> Hello everyone,
>
> I have a question about checkpointing on dataset.
>
> It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike RDD
> there is no Dataset.isCheckpointed().
>
> I wonder if Dataset.checkpoint is a syntactic sugar for
> Dataset.rdd.checkpoint.
> When I do :
>
> Dataset.checkpoint; Dataset.count
> Dataset.rdd.isCheckpointed // result: false
>
> However, when I explicitly do:
> Dataset.rdd.checkpoint; Dataset.rdd.count
> Dataset.rdd.isCheckpointed // result: true
>
> Could someone explain this behavior to me, or provide some references?
>
> Best regards,
> Bernard
>


Re: Dataset API Question

2017-10-25 Thread Reynold Xin
It is a bit more than syntactic sugar, but not much more:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L533

BTW this is basically writing all the data out, and then create a new
Dataset to load them in.


On Wed, Oct 25, 2017 at 6:51 AM, Bernard Jesop 
wrote:

> Hello everyone,
>
> I have a question about checkpointing on dataset.
>
> It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike RDD
> there is no Dataset.isCheckpointed().
>
> I wonder if Dataset.checkpoint is a syntactic sugar for
> Dataset.rdd.checkpoint.
> When I do :
>
> Dataset.checkpoint; Dataset.count
> Dataset.rdd.isCheckpointed // result: false
>
> However, when I explicitly do:
> Dataset.rdd.checkpoint; Dataset.rdd.count
> Dataset.rdd.isCheckpointed // result: true
>
> Could someone explain this behavior to me, or provide some references?
>
> Best regards,
> Bernard
>


Re: spark session jdbc performance

2017-10-25 Thread lucas.g...@gmail.com
Gourav, I'm assuming you misread the code.  It's 30 partitions, which isn't
a ridiculous value.  Maybe you misread the upperBound for the partitions?
(That would be ridiculous)

Why not use the PK as the partition column?  Obviously it depends on the
downstream queries.  If you're going to be performing joins (which I assume
is the case) then partitioning on the join column would be advisable, but
what about the case where the join column would be heavily skewed?

Thanks!

Gary

On 24 October 2017 at 23:41, Gourav Sengupta 
wrote:

> Hi Naveen,
>
> I do not think that it is prudent to use the PK as the partitionColumn.
> That is too many partitions for any system to handle. The numPartitions
> will be valid in case of JDBC very differently.
>
> Please keep me updated on how things go.
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire 
> wrote:
>
>>
>> Hi,
>>
>>
>>
>> I am trying to fetch data from Oracle DB using a subquery and
>> experiencing lot of performance issues.
>>
>>
>>
>> Below is the query I am using,
>>
>>
>>
>> *Using Spark 2.0.2*
>>
>>
>>
>> *val *df = spark_session.read.format(*"jdbc"*)
>> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
>> .option(*"url"*, jdbc_url)
>>.option(*"user"*, user)
>>.option(*"password"*, pwd)
>>.option(*"dbtable"*, *"subquery"*)
>>.option(*"partitionColumn"*, *"id"*)  //primary key column uniformly
>> distributed
>>.option(*"lowerBound"*, *"1"*)
>>.option(*"upperBound"*, *"50"*)
>> .option(*"numPartitions"*, 30)
>> .load()
>>
>>
>>
>> The above query is running using the 30 partitions, but when I see the UI
>> it is only using 1 partiton to run the query.
>>
>>
>>
>> Can anyone tell if I am missing anything or do I need to anything else to
>> tune the performance of the query.
>>
>>  *Thanks*
>>
>
>


Structured Stream equivalent of reduceByKey

2017-10-25 Thread Piyush Mukati
Hi,
we are migrating some jobs from Dstream to Structured Stream.

Currently to handle aggregations we call map and reducebyKey on each RDD
like
rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))

The final output of each RDD is merged to the sink with support for
aggregation at the sink( Like co-processor at HBase ).

In the new DataSet API, I am not finding any suitable API to aggregate over
the micro-batch.
Most of the aggregation API uses state-store and provide global
aggregations. ( with append mode it does not give the change in existing
buckets )
Problems we are suspecting are :
 1) state-store is tightly linked to the job definitions. while in our case
we want may edit the job while keeping the older calculated aggregate as it
is.

The desired result can be achieved with below dataset APIs.
dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
while on observing the physical plan it does not call any merge before sort.

 Anyone aware of API or other workarounds to get the desired result?


Dataset API Question

2017-10-25 Thread Bernard Jesop
Hello everyone,

I have a question about checkpointing on dataset.

It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike RDD
there is no Dataset.isCheckpointed().

I wonder if Dataset.checkpoint is a syntactic sugar for
Dataset.rdd.checkpoint.
When I do :

Dataset.checkpoint; Dataset.count
Dataset.rdd.isCheckpointed // result: false

However, when I explicitly do:
Dataset.rdd.checkpoint; Dataset.rdd.count
Dataset.rdd.isCheckpointed // result: true

Could someone explain this behavior to me, or provide some references?

Best regards,
Bernard


[ANNOUNCE] Apache Spark 2.1.2

2017-10-25 Thread Holden Karau
We are happy to announce the availability of Spark 2.1.2!

Apache Spark 2.1.2 is a maintenance release, based on the branch-2.1
maintenance
branch of Spark. We strongly recommend all 2.1.x users to upgrade to this
stable release.

To download Apache Spark 2.1.2 visit http://spark.apache.org/downloads.html.
This version of Spark is also available on Maven, CRAN archive* & PyPI.

We would like to acknowledge all community members for contributing patches
to this release.

* SparkR can be manually downloaded from CRAN archive, and there are a few
more minor packaging issues to be fixed to have SparkR fully available in
CRAN, see SPARK-22344 for details.

-- 
Twitter: https://twitter.com/holdenkarau


Re: spark session jdbc performance

2017-10-25 Thread Gourav Sengupta
Hi Naveen,

I do not think that it is prudent to use the PK as the partitionColumn.
That is too many partitions for any system to handle. The numPartitions
will be valid in case of JDBC very differently.

Please keep me updated on how things go.


Regards,
Gourav Sengupta

On Tue, Oct 24, 2017 at 10:54 PM, Naveen Madhire 
wrote:

>
> Hi,
>
>
>
> I am trying to fetch data from Oracle DB using a subquery and experiencing
> lot of performance issues.
>
>
>
> Below is the query I am using,
>
>
>
> *Using Spark 2.0.2*
>
>
>
> *val *df = spark_session.read.format(*"jdbc"*)
> .option(*"driver"*,*"*oracle.jdbc.OracleDriver*"*)
> .option(*"url"*, jdbc_url)
>.option(*"user"*, user)
>.option(*"password"*, pwd)
>.option(*"dbtable"*, *"subquery"*)
>.option(*"partitionColumn"*, *"id"*)  //primary key column uniformly
> distributed
>.option(*"lowerBound"*, *"1"*)
>.option(*"upperBound"*, *"50"*)
> .option(*"numPartitions"*, 30)
> .load()
>
>
>
> The above query is running using the 30 partitions, but when I see the UI
> it is only using 1 partiton to run the query.
>
>
>
> Can anyone tell if I am missing anything or do I need to anything else to
> tune the performance of the query.
>
>  *Thanks*
>


Using Spark 2.2.0 SparkSession extensions to optimize file filtering

2017-10-25 Thread Chris Luby
I have an external catalog that has additional information on my Parquet files 
that I want to match up with the parsed filters from the plan to prune the list 
of files included in the scan.  I’m looking at doing this using the Spark 2.2.0 
SparkSession extensions similar to the built in partition pruning:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala

and this other project that is along the lines of what I want:

https://github.com/lightcopy/parquet-index/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/IndexSourceStrategy.scala

but isn’t caught up to 2.2.0, but I’m struggling to understand what type of 
extension I would use to do something like the above:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.SparkSessionExtensions

and if this is the appropriate strategy for this.

Are there any examples out there for using the new extension hooks to alter the 
files included in the plan?

Thanks.


Re: Is Spark suited for this use case?

2017-10-25 Thread Gourav Sengupta
Hi Saravanan,

SPARK may be free, but to make it run with the same level of performance,
consistency, and reliability will show you that SPARK or HADOOP or anything
else is essentially not free. With Informatica you pay for the licensing
and have almost no headaches as far as stability, upgrades, and reliability
is concerned.

If you want to deliver the same with SPARK, then the costs will start
escalating as you will have to go with SPARK vendors.

As with everything else my best analogy is dont use a fork for drinking
soup. SPARK works wonderfully with huge scale of data, SPARK cannot read
Oracle Binary Log files or provides a change data capture capability, for
your used case with such low volumes and solutions like Informatica and
Golden Gate, I think that you are already using optimal solutions.

Of course I am presuming that you DO NOT replicate your entire 6TB of MDM
platform everyday and just use CDC to transfer data to your data center for
reporting purposes.

In case you are interested in super fast reporting using AWS Redshift then
please do let me know, I have delivered several end-to-end hybrid data
warehouse solutions and will be happy to help you with the same.


Regards,
Gourav Sengupta

On Mon, Oct 16, 2017 at 5:32 AM, van den Heever, Christian CC <
christian.vandenhee...@standardbank.co.za> wrote:

> Hi,
>
>
>
> We basically have the same scenario but worldwide as we have bigger
> Datasets we use OGG à local à Sqoop Into Hadoop.
>
> By all means you can have spark reading the oracle tables and then do some
> changes to data in need which will not be done on scoop qry. Ie fraudulent
> detection on transaction records.
>
>
>
> But some time the simplest way is the best. Unless you need a change or
> need more then I would advise not using another hop.
>
> I would rather move away from files as OGG can do files and direct table
> loading then sqoop for the rest.
>
>
>
> Simpler is better.
>
>
>
> Hope this helps.
>
> C.
>
>
>
> *From:* Saravanan Thirumalai [mailto:saravanan.thiruma...@gmail.com]
> *Sent:* Monday, 16 October 2017 4:29 AM
> *To:* user@spark.apache.org
> *Subject:* Is Spark suited for this use case?
>
>
>
> We are an Investment firm and have a MDM platform in oracle at a vendor
> location and use Oracle Golden Gate to replicat data to our data center for
> reporting needs.
>
> Our data is not big data (total size 6 TB including 2 TB of archive data).
> Moreover our data doesn't get updated often, nightly once (around 50 MB)
> and some correction transactions during the day (<10 MB). We don't have
> external users and hence data doesn't grow real-time like e-commerce.
>
>
>
> When we replicate data from source to target, we transfer data through
> files. So, if there are DML operations (corrections) during day time on a
> source table, the corresponding file would have probably 100 lines of table
> data that needs to be loaded into the target database. Due to low volume of
> data we designed this through Informatica and this works in less than 2-5
> minutes. Can Spark be used in this case or would it be an overkill of
> technology use?
>
>
>
>
>
>
>
>
> Standard Bank email disclaimer and confidentiality note
> Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to 
> read our email disclaimer and confidentiality note. Kindly email 
> disclai...@standardbank.co.za (no content or subject line necessary) if you 
> cannot view that page and we will email our email disclaimer and 
> confidentiality note to you.
>
>
>
>


Re: spark session jdbc performance

2017-10-25 Thread Srinivasa Reddy Tatiredidgari
Hi, is the subquery is user defined sqls or table name in db.If it is user 
Defined sql.Make sure ur partition column is in main select clause.

Sent from Yahoo Mail on Android 
 
  On Wed, Oct 25, 2017 at 3:25, Naveen Madhire wrote:   

Hi,

 

I am trying to fetch data from Oracle DB using a subquery and experiencing lot 
of performance issues.

 

Below is the query I am using,

 

Using Spark 2.0.2

 

val df = spark_session.read.format("jdbc")
.option("driver","oracle.jdbc.OracleDriver")
.option("url", jdbc_url)
   .option("user", user)
   .option("password", pwd)
   .option("dbtable", "subquery")
   .option("partitionColumn", "id")  //primary key column uniformly distributed
   .option("lowerBound", "1")
   .option("upperBound", "50")
.option("numPartitions", 30)
.load()

 

The above query is running using the 30 partitions, but when I see the UI it is 
only using 1 partiton to run the query.

 

Can anyone tell if I am missing anything or do I need to anything else to tune 
the performance of the query.

 Thanks