Structured Streaming - HDFS State Store Performance Issues

2020-01-14 Thread William Briggs
Hi all, I've got a problem that really has me stumped. I'm running a
Structured Streaming query that reads from Kafka, performs some
transformations and stateful aggregations (using flatMapGroupsWithState),
and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
Semi-regularly, all the tasks except one will complete, and the one
remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
I've checked the number of input records (and overall size) for that task,
and everything seems in-line with all the other tasks - there's no visible
skew.

The only thing I have to go on at the moment is that the thread dump on the
executor that is hung shows a 'state-store-maintenance-task' thread, which
is blocked on an "Executor task launch worker" thread - that second thread
shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>

And a stack of:

java.lang.Object.wait(Native Method)
>
> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
> })
> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>

Based on this, I'm guessing that there's some kind of delay happening with
the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
large GCs, plenty of free memory, network bandwidth isn't saturated, no
under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly
appreciated!

Regards,
Will


Exactly-Once delivery with Structured Streaming and Kafka

2019-01-31 Thread William Briggs
I noticed that Spark 2.4.0 implemented support for reading only committed
messages in Kafka, and was excited. Are there currently any plans to update
the Kafka output sink to support exactly-once delivery?

Thanks,
Will


Change in configuration settings?

2018-06-08 Thread William Briggs
I recently upgraded a Structured Streaming application from Spark 2.2.1 ->
Spark 2.3.0. This application runs in yarn-cluster mode, and it made use of
the spark.yarn.{driver|executor}.memoryOverhead properties. I noticed the
job started crashing unexpectedly, and after doing a bunch of digging, it
seems that these properties were migrated to simply be
"spark.driver.memoryOverhead" and "spark.executor.memoryOverhead" - I see
that they existed in the 2.2.1 configuration documentation, but not the
2.3.0 docs.

However, I can't find anything in the release notes between versions that
references this change - should the old spark.yarn.* settings still work,
or were they completely removed in favor the new settings?

Regards,
Will


Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread William Briggs
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
job sources data from a Kafka topic, performs a variety of filters and
transformations, and sinks data back into a different Kafka topic.

Once per day, we stop the query in order to merge the namenode edit logs
with the fsimage, because Structured Streaming creates and destroys a
significant number of HDFS files, and EMR doesn't support a secondary or HA
namenode for fsimage compaction (AWS support directed us to do this, as
Namenode edit logs were filling the disk).

Occasionally, the Structured Streaming query will not restart because the
most recent file in the "commits" or "offsets" checkpoint subdirectory is
empty. This seems like an undesirable behavior, as it requires manual
intervention to remove the empty files in order to force the job to fall
back onto the last good values. Has anyone run into this behavior? The only
similar issue I can find is SPARK-21760
, which appears to have
no fix or workaround.

Any assistance would be greatly appreciated!

Regards,
Will


Re: Fw: Spark + Parquet + IBM Block Storage at Bluemix

2016-09-25 Thread Mario Ds Briggs

Hi Daniel,

can you give it a try in the IBM's Analytics for Spark, the fix has been in
for a week now


thanks
Mario



From:   Daniel Lopes <dan...@onematch.com.br>
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: Adam Roberts <arobe...@uk.ibm.com>, user
<user@spark.apache.org>, Steve Loughran
<ste...@hortonworks.com>, Sachin Aggarwal4/India/IBM@IBMIN
Date:   14/09/2016 01:19 am
Subject:Re: Fw: Spark + Parquet + IBM Block Storage at Bluemix



Hi Mario,

Thanks for your help, so I will keeping using CSVs

Best,

Daniel Lopes
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br

On Mon, Sep 12, 2016 at 3:39 PM, Mario Ds Briggs <mario.bri...@in.ibm.com>
wrote:
  Daniel,

  I believe it is related to
  https://issues.apache.org/jira/browse/SPARK-13979 and happens only when
  task fails in a executor (probably for some other reason u hit the latter
  in parquet and not csv).

  The PR in there, should be shortly available in IBM's Analytics for
  Spark.


  thanks
  Mario

  Inactive hide details for Adam Roberts---12/09/2016 09:37:21 pm---Mario,
  incase you've not seen this...Adam Roberts---12/09/2016 09:37:21
  pm---Mario, incase you've not seen this...

  From: Adam Roberts/UK/IBM
  To: Mario Ds Briggs/India/IBM@IBMIN
  Date: 12/09/2016 09:37 pm
  Subject: Fw: Spark + Parquet + IBM Block Storage at Bluemix


  Mario, incase you've not seen this...
 
 
 
 
 
 Adam Roberts
 
 IBM Spark   
 Team Lead   
 
 Runtime 
 Technologies
 - Hursley   
 
 
 
 
 
 
 
 
 


  - Forwarded by Adam Roberts/UK/IBM on 12/09/2016 17:06 -

  From: Daniel Lopes <dan...@onematch.com.br>
  To: Steve Loughran <ste...@hortonworks.com>
  Cc: user <user@spark.apache.org>
  Date: 12/09/2016 13:05
  Subject: Re: Spark + Parquet + IBM Block Storage at Bluemix




  Thanks Steve,

  But this error occurs only with parquet files, CSVs works.

  Best,

  Daniel Lopes
  Chief Data and Analytics Officer | OneMatch
  c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

  www.onematch.com.br

  On Sun, Sep 11, 2016 at 3:28 PM, Steve Loughran <ste...@hortonworks.com>
  wrote:
On 9 Sep 2016, at 17:56, Daniel Lopes <
dan...@onematch.com.br> wrote:

Hi, someone can help

I'm trying to use parquet in IBM Block Storage at Spark
but when I try to load get this error:

using this config

credentials = {
  "name": "keystone",
  "auth_url": "https://identity.open.softlayer.com;,
  "project":
"object_storage_23f274c1_d11XXXe634",
  "projectId": "XXd9c4aa39b7c7eb",
  "region": "dallas",
  "userId": "X64087180b40X2b909",
  "username": "admin_9dd810f8901d48778XX",
  "password": "chX6_",
  "domainId": "c1ddad17cfcX41",
  "domainName": "10XX",
  "role": "admin"
}

def set_hadoop_config(cred

Re: Fw: Spark + Parquet + IBM Block Storage at Bluemix

2016-09-12 Thread Mario Ds Briggs


Daniel,

I believe it is related to
https://issues.apache.org/jira/browse/SPARK-13979 and happens only when
task fails in a executor (probably for some other reason u hit the latter
in parquet and not csv).

The PR in there, should be shortly available in IBM's Analytics for Spark.


thanks
Mario



From:   Adam Roberts/UK/IBM
To: Mario Ds Briggs/India/IBM@IBMIN
Date:   12/09/2016 09:37 pm
Subject:Fw: Spark + Parquet + IBM Block Storage at Bluemix


Mario, incase you've not seen this...
   
   
   
   
   
   Adam Roberts
   
   IBM Spark   
   Team Lead   
   
   Runtime 
   Technologies
   - Hursley   
   
   
   
   
   
   
   
   
   



- Forwarded by Adam Roberts/UK/IBM on 12/09/2016 17:06 -

From:   Daniel Lopes <dan...@onematch.com.br>
To: Steve Loughran <ste...@hortonworks.com>
Cc: user <user@spark.apache.org>
Date:   12/09/2016 13:05
Subject:Re: Spark + Parquet + IBM Block Storage at Bluemix



Thanks Steve,

But this error occurs only with parquet files, CSVs works.

Best,

Daniel Lopes
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br

On Sun, Sep 11, 2016 at 3:28 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

On 9 Sep 2016, at 17:56, Daniel Lopes <dan...@onematch.com.br>
wrote:

Hi, someone can help

I'm trying to use parquet in IBM Block Storage at Spark but when I
try to load get this error:

using this config

credentials = {
  "name": "keystone",
  "auth_url": "https://identity.open.softlayer.com;,
  "project": "object_storage_23f274c1_d11XXXe634",
  "projectId": "XXd9c4aa39b7c7eb",
  "region": "dallas",
  "userId": "X64087180b40X2b909",
  "username": "admin_9dd810f8901d48778XX",
  "password": "chX6_",
  "domainId": "c1ddad17cfcX41",
  "domainName": "10XX",
  "role": "admin"
}

def set_hadoop_config(credentials):
    """This function sets the Hadoop configuration with given
credentials,
    so it is possible to access data using SparkContext"""

    prefix = "fs.swift.service." + credentials['name']
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + ".auth.url", credentials
['auth_url']+'/v3/auth/tokens')
    hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
    hconf.set(prefix + ".tenant", credentials['projectId'])
    hconf.set(prefix + ".username", credentials['userId'])
    hconf.set(prefix + ".password", credentials['password'])
    hconf.setInt(prefix + ".http.port", 8080)
    hconf.set(prefix + ".region", credentials['region'])
    hconf.setBoolean(prefix + ".public", True)

set_hadoop_config(credentials)

-

Py4JJavaErrorTraceback (most recent call last)
 in ()
> 1 train.groupby('Acordo').count().show()

Py4JJavaError: An error occurred wh

Re: Spark support for Complex Event Processing (CEP)

2016-04-21 Thread Mario Ds Briggs

googling 'java error 'is not a member of package' and then even its related
searches seemed to suggest it is not a missing jar problem, though i
couldnt put a finger on what exactly it is in your case

some specifically in spark-shell as well -
http://spark-packages.org/package/databricks/spark-csv


thanks
Mario



From:   Mich Talebzadeh <mich.talebza...@gmail.com>
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: Alonso Isidoro Roman <alons...@gmail.com>, Luciano Resende
<luckbr1...@gmail.com>, "user @spark" <user@spark.apache.org>
Date:   21/04/2016 08:34 pm
Subject:Re: Spark support for Complex Event Processing (CEP)



Hi,

Following example in

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

Does anyone know which jar file this belongs to?

I use scalatest_2.11-2.2.6.jar in my spark-shell

 spark-shell --master spark://50.140.197.217:7077
--jars 
,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
 /
home/hduser/jars/scalatest_2.11-2.2.6.jar'

scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
:28: error: object scalatest is not a member of package org
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

Thanks


Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com




On 20 April 2016 at 10:28, Mario Ds Briggs <mario.bri...@in.ibm.com> wrote:
  I did see your earlier post about Stratio decision. Will readup on it


  thanks
  Mario

  Inactive hide details for Alonso Isidoro Roman ---20/04/2016 02:24:39
  pm---Stratio decision could do the job https://github.comAlonso Isidoro
  Roman ---20/04/2016 02:24:39 pm---Stratio decision could do the job
  https://github.com/Stratio/Decision

  From: Alonso Isidoro Roman <alons...@gmail.com>
  To: Mich Talebzadeh <mich.talebza...@gmail.com>
  Cc: Mario Ds Briggs/India/IBM@IBMIN, Luciano Resende <
  luckbr1...@gmail.com>, "user @spark" <user@spark.apache.org>
  Date: 20/04/2016 02:24 pm
  Subject: Re: Spark support for Complex Event Processing (CEP)



  Stratio decision could do the job

  https://github.com/Stratio/Decision



  Alonso Isidoro Roman.

  Mis citas preferidas (de hoy) :
  "Si depurar es el proceso de quitar los errores de software, entonces
  programar debe ser el proceso de introducirlos..."
   -  Edsger Dijkstra

  My favorite quotes (today):
  "If debugging is the process of removing software bugs, then programming
  must be the process of putting ..."
    - Edsger Dijkstra

  "If you pay peanuts you get monkeys"


  2016-04-20 7:55 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
Thanks a lot Mario. Will have a look.

Regards,


Dr Mich Talebzadeh

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


http://talebzadehmich.wordpress.com




On 20 April 2016 at 06:53, Mario Ds Briggs <mario.bri...@in.ibm.com
> wrote:
Hi Mich,

Info is here - https://issues.apache.org/jira/browse/SPARK-14745

overview is in the pdf -

https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf


Usage examples not in the best place for now (will make it better)
-

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532


Your feedback is appreciated.


thanks
Mario

Inactive hide details for Mich Talebzadeh ---19/04/2016 12:45:52
am---great stuff Mario. Much appreciated. MichMich Talebzadeh
---19/04/2016 12:45:52 am---great stuff Mario. Much appreciated.
Mich

From: Mich Talebzadeh <mich.talebza...@gmail.com>
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: "user @spark" <user@spark.apache.org>, Luciano Resende <
luckbr1...@gmail.com>
Date: 19/04/2016 12:45 am
Subject: Re: Spark support for Complex Event Processing (CEP)




great stuff Mario. Much appreciated.

Mich

Dr Mich Talebzadeh

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


http://talebzadehmich.wordpress.com




On 18 April 2016 at 20:08, Mario Ds Briggs <mario.bri...@in.ibm.com
> wrote:
Hey Mich, Luciano

Will provide links with docs by tomorrow

thanks
Mario

- Message from Mich Talebzadeh <
mich.talebza...@gmail.com> on Sun, 17 Ap

Re: Spark support for Complex Event Processing (CEP)

2016-04-20 Thread Mario Ds Briggs

I did see your earlier post about Stratio decision. Will readup on it


thanks
Mario



From:   Alonso Isidoro Roman <alons...@gmail.com>
To: Mich Talebzadeh <mich.talebza...@gmail.com>
Cc:     Mario Ds Briggs/India/IBM@IBMIN, Luciano Resende
<luckbr1...@gmail.com>, "user @spark" <user@spark.apache.org>
Date:   20/04/2016 02:24 pm
Subject:Re: Spark support for Complex Event Processing (CEP)



Stratio decision could do the job

https://github.com/Stratio/Decision



Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


2016-04-20 7:55 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
  Thanks a lot Mario. Will have a look.

  Regards,


  Dr Mich Talebzadeh

  LinkedIn
  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

  http://talebzadehmich.wordpress.com




  On 20 April 2016 at 06:53, Mario Ds Briggs <mario.bri...@in.ibm.com>
  wrote:
   Hi Mich,

   Info is here - https://issues.apache.org/jira/browse/SPARK-14745

   overview is in the pdf -
   
https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf


   Usage examples not in the best place for now (will make it better) -
   
https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532


   Your feedback is appreciated.


   thanks
   Mario

   Inactive hide details for Mich Talebzadeh ---19/04/2016 12:45:52
   am---great stuff Mario. Much appreciated. MichMich Talebzadeh
   ---19/04/2016 12:45:52 am---great stuff Mario. Much appreciated. Mich

   From: Mich Talebzadeh <mich.talebza...@gmail.com>
   To: Mario Ds Briggs/India/IBM@IBMIN
   Cc: "user @spark" <user@spark.apache.org>, Luciano Resende <
   luckbr1...@gmail.com>
   Date: 19/04/2016 12:45 am
   Subject: Re: Spark support for Complex Event Processing (CEP)




   great stuff Mario. Much appreciated.

   Mich

   Dr Mich Talebzadeh

   LinkedIn
   
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


   http://talebzadehmich.wordpress.com




   On 18 April 2016 at 20:08, Mario Ds Briggs <mario.bri...@in.ibm.com>
   wrote:


 Hey Mich, Luciano

 Will provide links with docs by tomorrow

 thanks
 Mario

 - Message from Mich Talebzadeh <mich.talebza...@gmail.com> on
 Sun, 17 Apr 2016 19:17:38 +0100 -
   
To: Luciano Resende <luckbr1...@gmail.com> 
   
cc: "user @spark" <user@spark.apache.org>  
   
   Subject: Re: Spark support for Complex Event
Processing (CEP)   
   

 Thanks Luciano. Appreciated.

 Regards

 Dr Mich Talebzadeh

 LinkedIn
 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


 http://talebzadehmich.wordpress.com




 On 17 April 2016 at 17:32, Luciano Resende <luckbr1...@gmail.com>
 wrote:


 Hi Mitch,

 I know some folks that were investigating/prototyping
 on this area, let me see if I can get them to reply
 here with more details.

 On Sun, Apr 17, 2016 at 1:54 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:
 Hi,

 Has Spark got libraries for CEP using Spark Streaming
 with Kafka by any chance?

 I am looking at Flink that supposed to have these
 libraries for CEP but I find Flink itself very much
 work in progress.

 Thanks

 Dr Mich Talebzadeh

 LinkedIn
 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


 http://talebzadehmich.wordpress.com






 --
 Luciano Resende
 http://twitter.com/lresende1975
 http://lresende.blogspot.com/

















Re: Spark support for Complex Event Processing (CEP)

2016-04-19 Thread Mario Ds Briggs

Hi Mich,

Info is here - https://issues.apache.org/jira/browse/SPARK-14745

overview is in the pdf -
https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf

Usage examples not in the best place for now (will make it better) -
https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

Your feedback is appreciated.


thanks
Mario



From:   Mich Talebzadeh <mich.talebza...@gmail.com>
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: "user @spark" <user@spark.apache.org>, Luciano Resende
<luckbr1...@gmail.com>
Date:   19/04/2016 12:45 am
Subject:Re: Spark support for Complex Event Processing (CEP)



great stuff Mario. Much appreciated.

Mich

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com




On 18 April 2016 at 20:08, Mario Ds Briggs <mario.bri...@in.ibm.com> wrote:
  Hey Mich, Luciano

  Will provide links with docs by tomorrow

  thanks
  Mario

  - Message from Mich Talebzadeh <mich.talebza...@gmail.com> on Sun, 17
  Apr 2016 19:17:38 +0100 -
 
  To: Luciano Resende <luckbr1...@gmail.com> 
 
  cc: "user @spark" <user@spark.apache.org>  
 
 Subject: Re: Spark support for Complex Event
  Processing (CEP)   
 

  Thanks Luciano. Appreciated.

  Regards

  Dr Mich Talebzadeh

  LinkedIn
  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


  http://talebzadehmich.wordpress.com




  On 17 April 2016 at 17:32, Luciano Resende <luckbr1...@gmail.com> wrote:
Hi Mitch,

I know some folks that were investigating/prototyping on this area,
let me see if I can get them to reply here with more details.

On Sun, Apr 17, 2016 at 1:54 AM, Mich Talebzadeh <
mich.talebza...@gmail.com> wrote:
Hi,

Has Spark got libraries for CEP using Spark Streaming with Kafka by
any chance?

I am looking at Flink that supposed to have these libraries for CEP
but I find Flink itself very much work in progress.

Thanks

Dr Mich Talebzadeh

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


http://talebzadehmich.wordpress.com






--
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/













Re: Spark support for Complex Event Processing (CEP)

2016-04-18 Thread Mario Ds Briggs

Hey Mich, Luciano

 Will provide links with docs by tomorrow

thanks
Mario

- Message from Mich Talebzadeh  on Sun, 17
Apr 2016 19:17:38 +0100 -
 
  To: Luciano Resende  
 
  cc: "user @spark"   
 
 Subject: Re: Spark support for Complex Event
  Processing (CEP)   
 

Thanks Luciano. Appreciated.

Regards

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com




On 17 April 2016 at 17:32, Luciano Resende  wrote:
  Hi Mitch,

  I know some folks that were investigating/prototyping on this area, let
  me see if I can get them to reply here with more details.

  On Sun, Apr 17, 2016 at 1:54 AM, Mich Talebzadeh <
  mich.talebza...@gmail.com> wrote:
   Hi,

   Has Spark got libraries for CEP using Spark Streaming with Kafka by any
   chance?

   I am looking at Flink that supposed to have these libraries for CEP but
   I find Flink itself very much work in progress.

   Thanks

   Dr Mich Talebzadeh

   LinkedIn
   
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

   http://talebzadehmich.wordpress.com






  --
  Luciano Resende
  http://twitter.com/lresende1975
  http://lresende.blogspot.com/




Re: How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread William Briggs
When submitting to YARN, you can specify two different operation modes for
the driver with the --master parameter: yarn-client or yarn-cluster. For
more information on submitting to YARN, see this page in the Spark docs:
http://spark.apache.org/docs/latest/running-on-yarn.html

yarn-cluster mode will run the driver inside of the Application Master,
which will be retried on failure. The number of retries is dependent on the
yarn.resourcemanager.am.max-attempts configuration setting for the YARN
ResourceManager.

Regards,
Will

On Wed, Aug 19, 2015 at 2:55 AM, Spark Enthusiast sparkenthusi...@yahoo.in
wrote:

 Folks,

 As I see, the Driver program is a single point of failure. Now, I have
 seen ways as to how to make it recover from failures on a restart (using
 Checkpointing) but I have not seen anything as to how to restart it
 automatically if it crashes.

 Will running the Driver as a Hadoop Yarn Application do it? Can someone
 educate me as to how?



Re: Scala: How to match a java object????

2015-08-18 Thread William Briggs
Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  saif.a.ell...@wellsfargo.com wrote:

 Hi all,

 I am trying to run a spark job, in which I receive *java.math.BigDecimal* 
 objects,
 instead of the scala equivalents, and I am trying to convert them into
 Doubles.
 If I try to match-case this object class, I get: *“**error: object
 java.math.BigDecimal is not a value**”*

 How could I get around matching java objects? I would like to avoid a
 multiple try-catch on ClassCastExceptions for all my checks.

 Thank you,
 Saif




Re: Why Kryo Serializer is slower than Java Serializer in TeraSort

2015-07-05 Thread Will Briggs
That code doesn't appear to be registering classes with Kryo, which means the 
fully-qualified classname is stored with every Kryo record. The Spark 
documentation has more on this: 
https://spark.apache.org/docs/latest/tuning.html#data-serialization

Regards,
Will

On July 5, 2015, at 2:31 AM, Gavin Liu ilovesonsofanar...@gmail.com wrote:

Hi,

I am using TeraSort benchmark from ehiggs's branch 
https://github.com/ehiggs/spark-terasort
https://github.com/ehiggs/spark-terasort  . Then I noticed that in
TeraSort.scala, it is using Kryo Serializer. So I made a small change from
org.apache.spark.serializer.KryoSerializer to
org.apache.spark.serializer.JavaSerializer to see the time difference.

Curiously, using Java Serializer is much quicker than using Kryo and there
is no error reported when I run the program. Here is the record from history
server, first one is kryo. second one is java default. 

1.
http://apache-spark-user-list.1001560.n3.nabble.com/file/n23621/kryo.png 

2.
http://apache-spark-user-list.1001560.n3.nabble.com/file/n23621/java.png 

I am wondering if I did something wrong or there is any other reason behind
this result.

Thanks for any help,
Gavin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Kryo-Serializer-is-slower-than-Java-Serializer-in-TeraSort-tp23621.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kryo fails to serialise output

2015-07-03 Thread Will Briggs
Kryo serialization is used internally by Spark for spilling or shuffling 
intermediate results, not for writing out an RDD as an action. Look at Sandy 
Ryza's examples for some hints on how to do this: 
https://github.com/sryza/simplesparkavroapp

Regards,
Will

On July 3, 2015, at 2:45 AM, Dominik Hübner cont...@dhuebner.com wrote:

I have a rather simple avro schema to serialize Tweets (message, username, 
timestamp).
Kryo and twitter chill are used to do so.

For my dev environment the Spark context is configured as below

val conf: SparkConf = new SparkConf()
conf.setAppName(kryo_test)
conf.setMaster(“local[4])
conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
conf.set(spark.kryo.registrator, co.feeb.TweetRegistrator”)

Serialization is setup with

override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[Tweet], 
AvroSerializer.SpecificRecordBinarySerializer[Tweet])
}

(This method gets called)


Using this configuration to persist some object fails with 
java.io.NotSerializableException: co.feeb.avro.Tweet 
(which seems to be ok as this class is not Serializable)

I used the following code:

val ctx: SparkContext = new SparkContext(conf)
val tweets: RDD[Tweet] = ctx.parallelize(List(
new Tweet(a, b, 1L),
new Tweet(c, d, 2L),
new Tweet(e, f, 3L)
  )
)

tweets.saveAsObjectFile(file:///tmp/spark”)

Using saveAsTextFile works, but persisted files are not binary but JSON

cat /tmp/spark/part-0
{username: a, text: b, timestamp: 1}
{username: c, text: d, timestamp: 2}
{username: e, text: f, timestamp: 3}

Is this intended behaviour, a configuration issue, avro serialisation not 
working in local mode or something else?





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Accumulators in Streaming

2015-06-21 Thread Will Briggs
It sounds like accumulators are not necessary in Spark Streaming - see this 
post ( 
http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html)
 for more details.

On June 21, 2015, at 7:31 PM, anshu shukla anshushuk...@gmail.com wrote:

In spark Streaming ,Since we are already having Streaming context ,  which does 
not allows us to have accumulators .We have to get sparkContext  for 
initializing accumulator value .

But  having 2 spark context will not serve the problem .


Please Help !!


-- 

Thanks  Regards,
Anshu Shukla



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Will Briggs
In general, you should avoid making direct changes to the Spark source code. If 
you are using Scala, you can seamlessly blend your own methods on top of the 
base RDDs using implicit conversions.

Regards,
Will

On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote:

I am trying to submit a spark application using the command line. I used the
spark submit command for doing so. I initially setup my Spark application on
Eclipse and have been making changes on there. I recently obtained my own
version of the Spark source code and added a new method to RDD.scala. I
created a new spark core jar using mvn, and I added it to my eclipse build
path. My application ran perfectly fine. 

Now, I would like to submit it through the command line. I submitted my
application like this:

bin/spark-submit --master local[2] --class SimpleApp
/Users/XXX/Desktop/spark2.jar

The spark-submit command is within the spark project that I modified by
adding new methods.
When I do so, I get this error:

java.lang.NoSuchMethodError:
org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
at SimpleApp$.main(SimpleApp.scala:12)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

When I use spark submit, where does the jar come from? How do I make sure it
uses the jars that have built? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark or Storm

2015-06-16 Thread Will Briggs
The programming models for the two frameworks are conceptually rather 
different; I haven't worked with Storm for quite some time, but based on my old 
experience with it, I would equate Spark Streaming more with Storm's Trident 
API, rather than with the raw Bolt API. Even then, there are significant 
differences, but it's a bit closer.

If you can share your use case, we might be able to provide better guidance.

Regards,
Will

On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote:

Hi All,

I am evaluating spark VS storm ( spark streaming  ) and i am not able to see 
what is equivalent of Bolt in storm inside spark.

Any help will be appreciated on this ? 

Thanks ,
Ashish
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Will Briggs
If this is research-only, and you don't want to have to worry about updating 
the jars installed by default on the cluster, you can add your custom Spark jar 
using the spark.driver.extraLibraryPath configuration property when running 
spark-submit, and then use the experimental  spark.driver.userClassPathFirst 
config to force it to use yours.

See here for more details and options: 
https://spark.apache.org/docs/1.4.0/configuration.html

On June 16, 2015, at 10:12 PM, Raghav Shankar raghav0110...@gmail.com wrote:

I made the change so that I could implement top() using treeReduce(). A member 
on here suggested I make the change in RDD.scala to accomplish that. Also, this 
is for a research project, and not for commercial use. 

So, any advice on how I can get the spark submit to use my custom built jars 
would be very useful.

Thanks,
Raghav

 On Jun 16, 2015, at 6:57 PM, Will Briggs wrbri...@gmail.com wrote:
 
 In general, you should avoid making direct changes to the Spark source code. 
 If you are using Scala, you can seamlessly blend your own methods on top of 
 the base RDDs using implicit conversions.
 
 Regards,
 Will
 
 On June 16, 2015, at 7:53 PM, raggy raghav0110...@gmail.com wrote:
 
 I am trying to submit a spark application using the command line. I used the
 spark submit command for doing so. I initially setup my Spark application on
 Eclipse and have been making changes on there. I recently obtained my own
 version of the Spark source code and added a new method to RDD.scala. I
 created a new spark core jar using mvn, and I added it to my eclipse build
 path. My application ran perfectly fine. 
 
 Now, I would like to submit it through the command line. I submitted my
 application like this:
 
 bin/spark-submit --master local[2] --class SimpleApp
 /Users/XXX/Desktop/spark2.jar
 
 The spark-submit command is within the spark project that I modified by
 adding new methods.
 When I do so, I get this error:
 
 java.lang.NoSuchMethodError:
 org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
   at SimpleApp$.main(SimpleApp.scala:12)
   at SimpleApp.main(SimpleApp.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
   at 
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
 When I use spark submit, where does the jar come from? How do I make sure it
 uses the jars that have built? 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Does spark performance really scale out with multiple machines?

2015-06-15 Thread William Briggs
There are a lot of variables to consider. I'm not an expert on Spark, and
my ML knowledge is rudimentary at best, but here are some questions whose
answers might help us to help you:

   - What type of Spark cluster are you running (e.g., Stand-alone, Mesos,
   YARN)?
   - What does the HTTP UI tell you in terms of number of stages / tasks,
   number of exectors, and task execution time / memory used / amount of data
   shuffled over the network?

As I said, I'm not all that familiar with the ML side of Spark, but in
general, if I were adding more resources, and not seeing an improvement,
here are a few things I would consider:

   1. Is your data set partitioned to allow the parallelism you are
   seeking? Spark's parallelism comes from processing RDD partitions in
   parallel, not processing individual RDD items in parallel; if you don't
   have enough partitions to take advantage of the extra hardware, you will
   see no benefit from adding capacity to your cluster.
   2. Do you have enough Spark executors to process your partitions in
   parallel? This depends on  your configuration and on your cluster type
   (doubtful this is an issue here, since you are adding more executors and
   seeing very little benefit).
   3. Are your partitions small enough (and/or your executor memory
   configuration large enough) so that each partition fits into the memory of
   an executor? If not, you will be constantly spilling to disk, which will
   have a severe impact on performance.
   4. Are you shuffling over the network? If so, how frequently and how
   much? Are you using efficient serialization (e.g., Kryo) and registering
   your serialized classes in order to minimize shuffle overhead?

There are plenty more variables, and some very good performance tuning
documentation https://spark.apache.org/docs/latest/tuning.html is
available. Without any more information to go on, my best guess would be
that you hit your maximum level of parallelism with the addition of the
second node (and even that was not fully utilized), and thus you see no
difference when adding a third node.

Regards,
Will


On Mon, Jun 15, 2015 at 1:29 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  I try to measure how spark standalone cluster performance scale out with
 multiple machines. I did a test of training the SVM model which is heavy in
 memory computation. I measure the run time for spark standalone cluster of
 1 – 3 nodes, the result is following



 1 node: 35 minutes

 2 nodes: 30.1 minutes

 3 nodes: 30.8 minutes



 So the speed does not seems to increase much with more machines. I know
 there are overhead for coordinating tasks among different machines. Seem to
 me the overhead is over 30% of the total run time.



 Is this typical? Does anybody see significant performance increase with
 more machines? Is there anything I can tune my spark cluster to make it
 scale out with more machines?



 Thanks

 Ningjun





Re: Does spark performance really scale out with multiple machines?

2015-06-15 Thread William Briggs
I just wanted to clarify - when I said you hit your maximum level of
parallelism, I meant that the default number of partitions might not be
large enough to take advantage of more hardware, not that there was no way
to increase your parallelism - the documentation I linked gives a few
suggestions on how to increase the number of partitions.

-Will

On Mon, Jun 15, 2015 at 5:00 PM, William Briggs wrbri...@gmail.com wrote:

 There are a lot of variables to consider. I'm not an expert on Spark, and
 my ML knowledge is rudimentary at best, but here are some questions whose
 answers might help us to help you:

- What type of Spark cluster are you running (e.g., Stand-alone,
Mesos, YARN)?
- What does the HTTP UI tell you in terms of number of stages / tasks,
number of exectors, and task execution time / memory used / amount of data
shuffled over the network?

 As I said, I'm not all that familiar with the ML side of Spark, but in
 general, if I were adding more resources, and not seeing an improvement,
 here are a few things I would consider:

1. Is your data set partitioned to allow the parallelism you are
seeking? Spark's parallelism comes from processing RDD partitions in
parallel, not processing individual RDD items in parallel; if you don't
have enough partitions to take advantage of the extra hardware, you will
see no benefit from adding capacity to your cluster.
2. Do you have enough Spark executors to process your partitions in
parallel? This depends on  your configuration and on your cluster type
(doubtful this is an issue here, since you are adding more executors and
seeing very little benefit).
3. Are your partitions small enough (and/or your executor memory
configuration large enough) so that each partition fits into the memory of
an executor? If not, you will be constantly spilling to disk, which will
have a severe impact on performance.
4. Are you shuffling over the network? If so, how frequently and how
much? Are you using efficient serialization (e.g., Kryo) and registering
your serialized classes in order to minimize shuffle overhead?

 There are plenty more variables, and some very good performance tuning
 documentation https://spark.apache.org/docs/latest/tuning.html is
 available. Without any more information to go on, my best guess would be
 that you hit your maximum level of parallelism with the addition of the
 second node (and even that was not fully utilized), and thus you see no
 difference when adding a third node.

 Regards,
 Will


 On Mon, Jun 15, 2015 at 1:29 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

  I try to measure how spark standalone cluster performance scale out
 with multiple machines. I did a test of training the SVM model which is
 heavy in memory computation. I measure the run time for spark standalone
 cluster of 1 – 3 nodes, the result is following



 1 node: 35 minutes

 2 nodes: 30.1 minutes

 3 nodes: 30.8 minutes



 So the speed does not seems to increase much with more machines. I know
 there are overhead for coordinating tasks among different machines. Seem to
 me the overhead is over 30% of the total run time.



 Is this typical? Does anybody see significant performance increase with
 more machines? Is there anything I can tune my spark cluster to make it
 scale out with more machines?



 Thanks

 Ningjun







Re: creation of RDD from a Tree

2015-06-14 Thread Will Briggs
If you are working on large structures, you probably want to look at the GraphX 
extension to Spark: 
https://spark.apache.org/docs/latest/graphx-programming-guide.html

On June 14, 2015, at 10:50 AM, lisp lispra...@gmail.com wrote:

Hi there,

I have a large amount of objects, which I have to partition into chunks with
the help of a binary tree: after each object has been run through the tree,
the leaves of that tree contain the chunks. Next I have to process each of
those chunks in the same way with a function f(chunk). So I thought if I
could make the list of chunks into an RDD listOfChunks, I could use Spark by
calling listOfChunks.map(f) and do the processing in parallel.

What would you recommend how I create the RDD? Is it possible to start with
an RDD that is a list of empty chunks and then to add my objects one by one
to the belonging chunks? Or would you recommend something else?

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creation-of-RDD-from-a-Tree-tp23310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to split log data into different files according to severity

2015-06-13 Thread Will Briggs
Check out this recent post by Cheng Liam regarding dynamic partitioning in 
Spark 1.4: https://www.mail-archive.com/user@spark.apache.org/msg30204.html

On June 13, 2015, at 5:41 AM, Hao Wang bill...@gmail.com wrote:

Hi,


I have a bunch of large log files on Hadoop. Each line contains a log and its 
severity. Is there a way that I can use Spark to split the entire data set into 
different files on Hadoop according the severity field? Thanks. Below is an 
example of the input and output.


Input:

[ERROR] log1

[INFO] log2

[ERROR] log3

[INFO] log4


Output:

error_file

[ERROR] log1

[ERROR] log3


info_file

[INFO] log2

[INFO] log4



Best,

Hao Wang



Re: Spark distinct() returns incorrect results for some types?

2015-06-11 Thread Will Briggs
To be fair, this is a long-standing issue due to optimizations for object reuse 
in the Hadoop API, and isn't necessarily a failing in Spark - see this blog 
post 
(https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/)
 from 2011 documenting a similar issue.



On June 11, 2015, at 3:17 PM, Sean Owen so...@cloudera.com wrote:

Yep you need to use a transformation of the raw value; use toString for 
example. 


On Thu, Jun 11, 2015, 8:54 PM Crystal Xing crystalxin...@gmail.com wrote:

That is a little scary. 
 So you mean in general, we shouldn't use hadoop's writable as Key in RDD? 


Zheng zheng


On Thu, Jun 11, 2015 at 6:44 PM, Sean Owen so...@cloudera.com wrote:

Guess: it has something to do with the Text object being reused by Hadoop? You 
can't in general keep around refs to them since they change. So you may have a 
bunch of copies of one object at the end that become just one in each 
partition. 


On Thu, Jun 11, 2015, 8:36 PM Crystal Xing crystalxin...@gmail.com wrote:

I load a   list of ids from a text file as NLineInputFormat, and when I do 
distinct(), it returns incorrect number.

 JavaRDDText idListData = jvc
                .hadoopFile(idList, NLineInputFormat.class,
                        LongWritable.class, Text.class).values().distinct()


I should have 7000K distinct value, how every it only returns 7000 values, 
which is the same as number of tasks.  The type I am using is 
import org.apache.hadoop.io.Text;



However,  if I switch to use String instead of Text, it works correcly. 

I think the Text class should have correct implementation of equals() and 
hashCode() functions since it is the hadoop class. 

Does anyone have clue what is going on? 

I am using spark 1.2. 

Zheng zheng






Re: Can a Spark App run with spark-submit write pdf files to HDFS

2015-06-09 Thread William Briggs
I don't know anything about your use case, so take this with a grain of
salt, but typically if you are operating at a scale that benefits from
Spark, then you likely will not want to write your output records as
individual files into HDFS. Spark has built-in support for the Hadoop
SequenceFile container format, which is a more scalable way to handle
writing out your results; you could write your Spark RDD transformations in
such a way that your final RDD is a PairRDD with a unique key (possibly
what would normally have been the standalone file name) and the value (in
this case, likely the byte array of the PDF you generated).

It looks like PDFBox's PDDocument class allows you to save the document
to an OutputStream
https://pdfbox.apache.org/docs/1.8.9/javadocs/org/apache/pdfbox/pdmodel/PDDocument.html#save(java.io.OutputStream),
so you could probably get away with saving to a ByteArrayOutputStream, and
snagging the bytes that comprise the final document. You can see more about
how to write SequenceFiles from Spark here
https://spark.apache.org/docs/latest/programming-guide.html#actions.

As an aside, one hint that I have found helpful since I starting working
with Spark is that if your transformation requires classes that are
expensive to instantiate, you may want to look into mapPartitions, which
allows you to do the setup once per partition instead of once per record. I
haven't used PDFBox, but it wouldn't surprise me to learn that there's some
non-neglible overhead involved.

Hope that helps,
Will

On Tue, Jun 9, 2015 at 5:57 PM, Richard Catlin richard.m.cat...@gmail.com
wrote:

 I would like to write pdf files using pdfbox to HDFS from my Spark
 application.  Can this be done?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkContext Threading

2015-06-06 Thread Will Briggs
Hi Lee, it's actually not related to threading at all - you would still have 
the same problem even if you were using a single thread. See this section ( 
https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs. 

On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:

On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

Your lambda expressions on the RDDs in the SecondRollup class are closing 
around the context, and Spark has special logic to ensure that all variables in 
a closure used on an RDD are Serializable - I hate linking to Quora, but 
there's a good explanation here: 
http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


Ah, I see!  So if I broke out the lambda expressions into a method on an object 
it would prevent this issue.  Essentially, don't use lambda expressions when 
using threads.


Thanks again, I appreciate the help. 



Re: SparkContext Threading

2015-06-06 Thread William Briggs
Hi Lee, I'm stuck with only mobile devices for correspondence right now, so
I can't get to shell to play with this issue - this is all supposition; I
think that the lambdas are closing over the context because it's a
constructor parameter to your Runnable class, which is why inlining the
lambdas into your main method doesn't show this issue.

On Sat, Jun 6, 2015, 10:55 AM Lee McFadden splee...@gmail.com wrote:

 Hi Will,

 That doesn't seem to be the case and was part of the source of my
 confusion. The code currently in the run method of the runnable works
 perfectly fine with the lambda expressions when it is invoked from the main
 method. They also work when they are invoked from within a separate method
 on the Transforms object.

 It was only when putting that same code into another thread that the
 serialization exception occurred.

 Examples throughout the spark docs also use lambda expressions a lot -
 surely those examples also would not work if this is always an issue with
 lambdas?

 On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote:

 Hi Lee, it's actually not related to threading at all - you would still
 have the same problem even if you were using a single thread. See this
 section (
 https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs.


 On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:


 On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

 Your lambda expressions on the RDDs in the SecondRollup class are
 closing around the context, and Spark has special logic to ensure that all
 variables in a closure used on an RDD are Serializable - I hate linking to
 Quora, but there's a good explanation here:
 http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


 Ah, I see!  So if I broke out the lambda expressions into a method on an
 object it would prevent this issue.  Essentially, don't use lambda
 expressions when using threads.

 Thanks again, I appreciate the help.




Re: write multiple outputs by key

2015-06-06 Thread Will Briggs
I believe groupByKey currently requires that all items for a specific key fit 
into a single and executive's memory: 
http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

This previous discussion has some pointers if you must use groupByKey, 
including adding a low-cardinality hash to your key: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html

Another option I didn't see mentioned would be to persist / cache the initial 
RDD, calculate the set of distinct key values out of it, and then derive a set 
of filtered RDDs from the cached dataset, one for each key. For this to work, 
your set of unique keys would need to fit into your driver's memory.

Regards,
Will

On June 6, 2015, at 11:07 AM, patcharee patcharee.thong...@uni.no wrote:

Hi,

How can I write to multiple outputs for each key? I tried to create 
custom partitioner or define the number of partition but does not work. 
There are only the few tasks/partitions (which equals to the number of 
all key combination) gets large datasets, data is not splitting to all 
tasks/partition. The job failed as the few tasks handled too far large 
datasets. Below is my code snippet.

val varWFlatRDD = 
varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are 
(zone, z, year, month)
 .foreach(
 x = {
   val z = x._1._1
   val year = x._1._2
   val month = x._1._3
   val df_table_4dim = x._2.toList.toDF()
   df_table_4dim.registerTempTable(table_4Dim)
   hiveContext.sql(INSERT OVERWRITE table 4dim partition 
(zone= + ZONE + ,z= + z + ,year= + year + ,month= + month + )  +
 select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, 
qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim);
   })


 From the spark history UI, at groupByKey there are  1000 tasks (equals 
to the parent's # partitions). at foreach there are  1000 tasks as 
well, but 50 tasks (same as the # all key combination)  gets datasets.

How can I fix this problem? Any suggestions are appreciated.

BR,
Patcharee



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext Threading

2015-06-05 Thread Will Briggs
Your lambda expressions on the RDDs in the SecondRollup class are closing 
around the context, and Spark has special logic to ensure that all variables in 
a closure used on an RDD are Serializable - I hate linking to Quora, but 
there's a good explanation here: 
http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


On June 5, 2015, at 4:14 PM, Lee McFadden splee...@gmail.com wrote:



On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin van...@cloudera.com wrote:

You didn't show the error so the only thing we can do is speculate. You're 
probably sending the object that's holding the SparkContext reference over  the 
network at some point (e.g. it's used by a task run in an executor), and that's 
why you were getting that exception.


Apologies - the full error is as follows.  All I did here was remove the 
@transient annotation from the sc variable in my class constructor.  In 
addition, the full code for the classes and launching process is included below.


Error traceback:

```

Exception in thread pool-5-thread-1 java.lang.Error: 
org.apache.spark.SparkException: Task not serializable

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Task not serializable

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)

        at org.apache.spark.rdd.RDD.map(RDD.scala:288)

        at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        ... 2 more

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

        ... 7 more 

```


Code:

```

class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: 
DateTime) extends Runnable with Serializable {


  def run {

    val conn = sc.cassandraTable(alpha_test, sensor_readings)

      .select(data)

      .where(timep = ?, scanPoint)

      .where(sensorid IN ?, System.sensors)

      .map(r = Json.parse(r.getString(data)))

      .cache()


    conn.flatMap(AffectedIp.fromJson)

      .map(a = (AffectedIp.key(a), a))

      .reduceByKey(AffectedIp.reduce)

      .map(_._2)

      .map(a = AffectedIp.reduceWithCurrent(connector, a))

      .saveToCassandra(alpha_test, affected_hosts)


    conn.flatMap(ServiceSummary.fromnJson)

      .map(s = (ServiceSummary.key(s), s))

      .reduceByKey(ServiceSummary.reduce)

      .map(_._2)

      .saveToCassandra(alpha_test, service_summary_rollup)


  }

}


object Transforms {

  private val appNameBase = Transforms%s

  private val dtFormatter = DateTimeFormat.forPattern(MMddHH)


  def main(args: Array[String]) {

    if (args.size  2) {

      println(Usage: ConnTransforms start end

        start     DateTime to start processing at. Format: MMddHH

        end       DateTime to end processing at.  Format: MMddHH)

      sys.exit(1)

    }


    // withZoneRetainFields gives us a UTC time as specified on the command 
line.

    val start = 
dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)

    val end = 
dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)


    println(Processing rollups from %s to %s.format(start, end))


    // Create the spark context.

    val conf = new SparkConf()

      .setAppName(appNameBase.format(Test))


    val connector = CassandraConnector(conf)


    val sc = new SparkContext(conf)


    // Set up the threadpool for 

Re: Deduping events using Spark

2015-06-04 Thread William Briggs
Hi Lee,

You should be able to create a PairRDD using the Nonce as the key, and the
AnalyticsEvent as the value. I'm very new to Spark, but here is some
uncompilable pseudo code that may or may not help:

events.map(event = (event.getNonce, event)).reduceByKey((a, b) =
a).map(_._2)

The above code is more Scala-like, since that's the syntax with which I
have more familiarity - it looks like the Spark Java 8 API is similar, but
you won't get implicit conversion to a PairRDD when you use a 2-Tuple as
the mapped value. Instead, will need to use the mapToPair function -
there's a good example in the Spark Programming Guide under Working With
Key-Value Pairs
https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
.

Hope this helps!

Regards,
Will

On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote:

 I'm still a bit new to Spark and am struggilng to figure out the best way
 to
 Dedupe my events.

 I load my Avro files from HDFS and then I want to dedupe events that have
 the same nonce.

 For example my code so far:

  JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
 context.newAPIHadoopRDD(
 context.hadoopConfiguration(),
 AvroKeyInputFormat.class,
 AvroKey.class,
 NullWritable.class
 ).keys())
 .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
 .filter(key - { return
 Optional.ofNullable(key.getStepEventKey()).isPresent(); })

 Now I want to get back an RDD of AnalyticsEvents that are unique. So I
 basically want to do:
 if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
 them.

 I'm not sure how to do this? If I do reduceByKey it reduces by
 AnalyticsEvent not by the values inside?

 Any guidance would be much appreciated how I can walk this list of events
 and only return a filtered version of unique nocnes.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Make HTTP requests from within Spark

2015-06-03 Thread William Briggs
Hi Kaspar,

This is definitely doable, but in my opinion, it's important to remember
that, at its core, Spark is based around a functional programming paradigm
- you're taking input sets of data and, by applying various
transformations, you end up with a dataset that represents your answer.
Without knowing more about your use case, and keeping in mind that I'm very
new to Spark, here are a few things I would want to think about if I were
writing this as a non-Streaming Spark application:

   1. What is your starting dataset? Do you have an initial set of
   parameters or a data source that is used to define each of the millions of
   requests? If so, then that should comprise your first RDD and you can
   perform subsequent transformations to prepare your HTTP requests (e.g.,
   start with the information that drives the generation of the requests, and
   use map/flatMap to create an RDD that has the full list of requests you
   want to run).
   2. Are the HTTP requests read-only, and/or idempotent (are you only
   looking up data, or are you performing requests that cause some sort of
   side effect)? Spark operations against RDDs work by defining a lineage
   graph, and transformations will be re-run if a partition in the lineage
   needs to be recalculated for any reason. If your HTTP requests are causing
   side-effects that should not be repeated, then Spark may not be the best
   fit for that portion of the job, and you might want to use something else,
   pipe the results into HDFS, and then analyze those using Spark..
   3. If your web service requests are lookups or are idempotent, then
   we're on the right track. Keep in mind that your web service probably will
   not scale as well as the Spark job - a naive first-pass implementation
   could easily overwhelm many services, particularly if/when partitions need
   to be recalculated. There are a few mechanisms you can use to mitigate this
   - one is to use mapPartitions rather than map when transforming the set of
   requests to the set of results, initialize an HTTP connection for each
   partition, and transform the data that defines the request into your
   desired dataset by invoking the web service. Using mapPartitions allows you
   to limit the number of concurrent HTTP connections to one per partition
   (although this may be too slow if your service is slow... there is
   obviously a bit of analysis, testing and profiling that would need to be
   done on the entire job). Another consideration would be to look at
   persisting or caching the intermediate results after you've successfully
   retrieved your results from the service, to reduce the likelihood of
   hitting the web service more than necessary.
   4. Just realized you might be looking for help invoking an HTTP service
   programmatically from Scala / Spark - if so, you might want to look at the
   spray-client http://spray.io/documentation/1.2.3/spray-client/ library.
   5. With millions of web service requests, it's highly likely some will
   fail, for a variety of reasons. Look into using Scala's Try
   http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Try or
   Either
   http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Either monads
   to encode success / failure, and treat failed requests as first-class
   citizens in your RDD of results (by retrying them, filtering them, logging
   them, etc., based on your specific needs and use case). Make sure you are
   setting reasonable timeouts on your service calls to prevent the jSpark ob
   from getting stuck if the service turns into a black hole.

As I said above, I'm pretty new to Spark, so others may have some better
advice, or even tell you to ignore mine completely (no hard feelings, I
promise - this is all very new to me).

Good luck!

Regards,
Will

On Wed, Jun 3, 2015 at 3:49 AM, kasparfischer kaspar.fisc...@dreizak.com
wrote:

 Hi everybody,

 I'm new to Spark, apologies if my question is very basic.

 I have a need to send millions of requests to a web service and analyse and
 store the responses in an RDD. I can easy express the analysing part using
 Spark's filter/map/etc. primitives but I don't know how to make the
 requests. Is that something I can do from within Spark? Or Spark Streaming?
 Or does it conflict with the way Spark works?

 I've found a similar question but am not sure whether the answer applies
 here:



 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html

 Any clarifications or pointers would be super helpful!

 Thanks,
 Kaspar



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: