Re: an OOM while persist as DISK_ONLY

2016-03-03 Thread Eugen Cepoi
We are in the process of upgrading to spark 1.6 from 1.4, and had a hard
time getting some of our more memory/join intensive jobs to work (rdd
caching + a lot of shuffling). Most of the time they were getting killed by
yarn.

Increasing the overhead was of course an option but the increase to make
the job pass was way higher than the overhead we had for spark 1.4, which
is way too much to be OK.

Playing with the configs above reduced the GC time but the problem still
persisted.

In the end it turned out we were hitting this issue
https://issues.apache.org/jira/browse/SPARK-12961.
What ended up working was to override the snappy version that comes with
EMR + disabling off-heap memory.

We still need to test the upgrade against our spark streaming jobs...
hopefully this issue https://issues.apache.org/jira/browse/SPARK-13288 is
also due to snappy...

Cheers,
Eugen


2016-03-03 16:14 GMT-08:00 Ted Yu <yuzhih...@gmail.com>:

> bq. that solved some problems
>
> Is there any problem that was not solved by the tweak ?
>
> Thanks
>
> On Thu, Mar 3, 2016 at 4:11 PM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:
>
>> You can limit the amount of memory spark will use for shuffle even in 1.6.
>> You can do that by tweaking the spark.memory.fraction and the
>> spark.storage.fraction. For example if you want to have no shuffle cache at
>> all you can set the storage.fraction to 1 or something close, to let a
>> small place for the shuffle cache. And then use the rest for storage, and
>> if you don't persist/broadcast data then you can reduce the whole
>> memory.fraction.
>>
>> Though not sure how good it is to tweak those values, as it assumes spark
>> is mostly using it for caching stuff... I have used similar tweaks in spark
>> 1.4 and tried it on spark 1.6 and that solved some problems...
>>
>> Eugen
>>
>> 2016-03-03 15:59 GMT-08:00 Andy Dang <nam...@gmail.com>:
>>
>>> Spark shuffling algorithm is very aggressive in storing everything in
>>> RAM, and the behavior is worse in 1.6 with the UnifiedMemoryManagement. At
>>> least in previous versions you can limit the shuffler memory, but Spark 1.6
>>> will use as much memory as it can get. What I see is that Spark seems to
>>> underestimate the amount of memory that objects take up, and thus doesn't
>>> spill frequently enough. There's a dirty work around (legacy mode) but the
>>> common advice is to increase your parallelism (and keep in mind that
>>> operations such as join have implicit parallelism, so you'll want to be
>>> explicit about it).
>>>
>>> ---
>>> Regards,
>>> Andy
>>>
>>> On Mon, Feb 22, 2016 at 2:12 PM, Alex Dzhagriev <dzh...@gmail.com>
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm using spark 1.6 and trying to cache a dataset which is 1.5 TB, I
>>>> have only ~800GB RAM  in total, so I am choosing the DISK_ONLY storage
>>>> level. Unfortunately, I'm getting out of the overhead memory limit:
>>>>
>>>>
>>>> Container killed by YARN for exceeding memory limits. 27.0 GB of 27 GB 
>>>> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>>>>
>>>>
>>>> I'm giving 6GB overhead memory and using 10 cores per executor.
>>>> Apparently, that's not enough. Without persisting the data and later
>>>> computing the dataset (twice in my case) the job works fine. Can anyone,
>>>> please, explain what is the overhead which consumes that much memory during
>>>> persist to the disk and how can I estimate what extra memory should I give
>>>> to the executors in order to make it not fail?
>>>>
>>>> Thanks, Alex.
>>>>
>>>
>>>
>>
>


Re: Bad Digest error while doing aws s3 put

2016-02-08 Thread Eugen Cepoi
I had similar problems with multi part uploads. In my case the real error
was something else which was being masked by this issue
https://issues.apache.org/jira/browse/SPARK-6560. In the end this bad
digest exception was a side effect and not the original issue. For me it
was some library version conflict on EMR.

Depending on the size of the output files, you might try to just disable
the multipart upload using fs.s3n.multipart.uploads.enabled

Cheers,
Eugen

2016-02-07 15:05 GMT-08:00 Steve Loughran :

>
> > On 7 Feb 2016, at 07:57, Dhimant  wrote:
> >
> >at
> >
> com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:245)
> >... 15 more
> > Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The
> > Content-MD5 you specified did not match what we received. (Service:
> Amazon
> > S3; Status Code: 400; Error Code: BadDigest; Request ID:
> 5918216A5901FCC8),
> > S3 Extended Request ID:
> >
> QSxtYln/yXqHYpdr4BWosin/TAFsGlK1FlKfE5PcuJkNrgoblGzTNt74kEhuNcrJCRZ3mXq0oUo=
> >at
> >
> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
> >at
> >
> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
> >at
> >
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
> >at
> > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
> >at
> > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3796)
> >at
> >
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1482)
> >at
> >
> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:140)
> >... 22 more
> >
>
> This is amazon's own s3 client. nothing in the apache hadoop source tree.
> Normally I'd say "use s3a to make s3n problems go away", but I don't know
> what that does on Amazon's own EMR libraries
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?

2015-12-11 Thread Eugen Cepoi
Do you have a large number of tasks? This can happen if you have a large
number of tasks and a small driver or if you use accumulators of lists like
datastructures.

2015-12-11 11:17 GMT-08:00 Zhan Zhang :

> I think you are fetching too many results to the driver. Typically, it is
> not recommended to collect much data to driver. But if you have to, you can
> increase the driver memory, when submitting jobs.
>
> Thanks.
>
> Zhan Zhang
>
> On Dec 11, 2015, at 6:14 AM, Tom Seddon  wrote:
>
> I have a job that is running into intermittent errors with  [SparkDriver]
> java.lang.OutOfMemoryError: Java heap space.  Before I was getting this
> error I was getting errors saying the result size exceed the 
> spark.driver.maxResultSize.
> This does not make any sense to me, as there are no actions in my job that
> send data to the driver - just a pull of data from S3, a map and
> reduceByKey and then conversion to dataframe and saveAsTable action that
> puts the results back on S3.
>
> I've found a few references to reduceByKey and spark.driver.maxResultSize
> having some importance, but cannot fathom how this setting could be related.
>
> Would greatly appreciated any advice.
>
> Thanks in advance,
>
> Tom
>
>
>


Mllib explain feature for tree ensembles

2015-10-28 Thread Eugen Cepoi
Hey,

Is there some kind of "explain" feature implemented in mllib for the
algorithms based on tree ensembles?
Some method to which you would feed in a single feature vector and it would
return/print what features contributed to the decision or how much each
feature contributed "negatively" and "positively" to the decision.

This can be very useful to debug a model on some specific samples and for
feature engineering.

Thanks,
Eugen


Re: Mllib explain feature for tree ensembles

2015-10-28 Thread Eugen Cepoi
I guess I will have to upgrade to spark 1.5, thanks!

2015-10-28 11:50 GMT+01:00 Yanbo Liang <yblia...@gmail.com>:

> Spark ML/MLlib has provided featureImportances
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala#L213>
>  to
> estimate the importance of each feature.
>
> 2015-10-28 18:29 GMT+08:00 Eugen Cepoi <cepoi.eu...@gmail.com>:
>
>> Hey,
>>
>> Is there some kind of "explain" feature implemented in mllib for the
>> algorithms based on tree ensembles?
>> Some method to which you would feed in a single feature vector and it
>> would return/print what features contributed to the decision or how much
>> each feature contributed "negatively" and "positively" to the decision.
>>
>> This can be very useful to debug a model on some specific samples and for
>> feature engineering.
>>
>> Thanks,
>> Eugen
>>
>
>


Re: spark streaming failing to replicate blocks

2015-10-23 Thread Eugen Cepoi
When fixing the port to the same values as in the stack trace it works too.
The network config of the slaves seems correct.

Thanks,
Eugen

2015-10-23 8:30 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>:

> Mostly a network issue, you need to check your network configuration from
> the aws console and make sure the ports are accessible within the cluster.
>
> Thanks
> Best Regards
>
> On Thu, Oct 22, 2015 at 8:53 PM, Eugen Cepoi <cepoi.eu...@gmail.com>
> wrote:
>
>> Huh indeed this worked, thanks. Do you know why this happens, is that
>> some known issue?
>>
>> Thanks,
>> Eugen
>>
>> 2015-10-22 19:08 GMT+07:00 Akhil Das <ak...@sigmoidanalytics.com>:
>>
>>> Can you try fixing spark.blockManager.port to specific port and see if
>>> the issue exists?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Oct 19, 2015 at 6:21 PM, Eugen Cepoi <cepoi.eu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
>>>> The job is reading data from Kinesis and the batch size is of 30s (I
>>>> used the same value for the kinesis checkpointing).
>>>> In the executor logs I can see every 5 seconds a sequence of
>>>> stacktraces indicating that the block replication failed. I am using the
>>>> default storage level MEMORY_AND_DISK_SER_2.
>>>> WAL is not enabled nor checkpointing (the checkpoint dir is configured
>>>> for the spark context but not for the streaming context).
>>>>
>>>> Here is an example of those logs for ip-10-63-160-18. They occur in
>>>> every executor while trying to replicate to any other executor.
>>>>
>>>>
>>>> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
>>>> [ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
>>>> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection 
>>>> to ip-10-63-160-18.ec2.internal/10.63.160.18:50929
>>>> java.net.ConnectException: Connection refused
>>>>at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>at 
>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>>>at 
>>>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>>>at 
>>>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>>>at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>at java.lang.Thread.run(Thread.java:745)
>>>> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
>>>> message.
>>>> java.net.ConnectException: Connection refused
>>>>at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>at 
>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>>>at 
>>>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>>>at 
>>>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>>>at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>at java.lang.Thread.run(Thread.java:745)
>>>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying 
>>>> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>>>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection error on 
>>>> connection to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>>>> 15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate 
>>>> input-1-144524231 to BlockManagerId(3, ip-10-159-151-22.ec2.internal, 
>>>> 50929), failure #0
>>>> java.net.ConnectException: Connection refused
>>>>at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>at 
>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>>>at 
>>>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>>>at 
>>>> org.apache.spark.networ

Re: spark streaming failing to replicate blocks

2015-10-22 Thread Eugen Cepoi
Huh indeed this worked, thanks. Do you know why this happens, is that some
known issue?

Thanks,
Eugen

2015-10-22 19:08 GMT+07:00 Akhil Das <ak...@sigmoidanalytics.com>:

> Can you try fixing spark.blockManager.port to specific port and see if the
> issue exists?
>
> Thanks
> Best Regards
>
> On Mon, Oct 19, 2015 at 6:21 PM, Eugen Cepoi <cepoi.eu...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
>> The job is reading data from Kinesis and the batch size is of 30s (I used
>> the same value for the kinesis checkpointing).
>> In the executor logs I can see every 5 seconds a sequence of stacktraces
>> indicating that the block replication failed. I am using the default
>> storage level MEMORY_AND_DISK_SER_2.
>> WAL is not enabled nor checkpointing (the checkpoint dir is configured
>> for the spark context but not for the streaming context).
>>
>> Here is an example of those logs for ip-10-63-160-18. They occur in every
>> executor while trying to replicate to any other executor.
>>
>>
>> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
>> [ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
>> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection to 
>> ip-10-63-160-18.ec2.internal/10.63.160.18:50929
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
>> message.
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying 
>> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection error on 
>> connection to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>> 15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate 
>> input-1-144524231 to BlockManagerId(3, ip-10-159-151-22.ec2.internal, 
>> 50929), failure #0
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Removing SendingConnection to 
>> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
>> [ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
>> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection to 
>> ip-10-63-160-18.ec2.internal/10.63.160.18:39506
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 

spark streaming failing to replicate blocks

2015-10-19 Thread Eugen Cepoi
Hi,

I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
The job is reading data from Kinesis and the batch size is of 30s (I used
the same value for the kinesis checkpointing).
In the executor logs I can see every 5 seconds a sequence of stacktraces
indicating that the block replication failed. I am using the default
storage level MEMORY_AND_DISK_SER_2.
WAL is not enabled nor checkpointing (the checkpoint dir is configured for
the spark context but not for the streaming context).

Here is an example of those logs for ip-10-63-160-18. They occur in every
executor while trying to replicate to any other executor.


15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to
[ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing
connection to ip-10-63-160-18.ec2.internal/10.63.160.18:50929
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending message.
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying
ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection
error on connection to
ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate
input-1-144524231 to BlockManagerId(3,
ip-10-159-151-22.ec2.internal, 50929), failure #0
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 INFO nio.ConnectionManager: Removing
SendingConnection to
ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to
[ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing
connection to ip-10-63-160-18.ec2.internal/10.63.160.18:39506
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending message.
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 INFO 

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Eugen Cepoi
Hey,

A quick update on other things that have been tested.

When looking at the compiled code of the spark-streaming-kinesis-asl jar
everything looks normal (there is a class that implements SyncMap and it is
used inside the receiver).
Starting a spark shell and using introspection to instantiate a receiver
and check that blockIdToSeqNumRanges implements SyncMap works too. So
obviously it has the correct type according to that.

Another thing to test could be to do the same introspection stuff but
inside a spark job to make sure it is not a problem in the way the jobs are
run.
The other idea would be that this is a problem related to ser/de. For
example if the receiver was being serialized and then deserialized it could
definitely happen depending on the lib used and its configuration that it
just doesn't preserve the concrete type. So it would deserialize using the
compile type instead of the runtime type.

Cheers,
Eugen


2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré :

> Thanks for the update Phil.
>
> I'm preparing a environment to reproduce it.
>
> I keep you posted.
>
> Thanks again,
> Regards
> JB
>
> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>
>> Not a dumb question, but yes I updated all of the library references to
>> 1.5, including  (even tried 1.5.1).
>>
>> // Versions.spark set elsewhere to "1.5.0"
>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>> "provided"
>>
>> I am experiencing the issue in my own spark project, but also when I try
>> to run the spark streaming kinesis example that comes in spark/examples
>>
>> Tried running the streaming job locally, and also in EMR with release
>> 4.1.0 that includes Spark 1.5
>>
>> Very strange!
>>
>> -- Forwarded message --
>>
>> From: "Jean-Baptiste Onofré" > >>
>> To: user@spark.apache.org 
>>
>> Cc:
>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>> Subject: Re: Spark 1.5 Streaming and Kinesis
>> Hi Phil,
>> KinesisReceiver is part of extra. Just a dumb question: did you
>> update all, including the Spark Kinesis extra containing the
>> KinesisReceiver ?
>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>> see:
>> blockIdToSeqNumRanges.clear()
>> which is a:
>> private val blockIdToSeqNumRanges = new
>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>  with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
>> So, it doesn't look fully correct to me.
>> Let me investigate a bit this morning.
>> Regards
>> JB
>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>> streaming applications, to take advantage of the new Kinesis
>> checkpointing improvements in 1.5.
>> However after upgrading, we are consistently seeing the following
>> error:
>> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot
>> be
>> cast to scala.collection.mutable.SynchronizedMap
>> at
>>
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>> at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>> at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>> at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>> at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>> at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> I even get this when running the Kinesis examples :
>>
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>> with
>> bin/run-example streaming.KinesisWordCountASL
>> Am I doing something incorrect?
>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org 
>> http://blog.nanthrax.net 
>> Talend - http://www.talend.com 
>>
>> Hi,
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> 

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Eugen Cepoi
So running it using spark-submit doesnt change anything, it still works.

When reading the code
https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
it looks like the receivers are definitely being ser/de. I think this is
the issue, need to find a way to confirm that now...

2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com>:

> Hey,
>
> A quick update on other things that have been tested.
>
> When looking at the compiled code of the spark-streaming-kinesis-asl jar
> everything looks normal (there is a class that implements SyncMap and it is
> used inside the receiver).
> Starting a spark shell and using introspection to instantiate a receiver
> and check that blockIdToSeqNumRanges implements SyncMap works too. So
> obviously it has the correct type according to that.
>
> Another thing to test could be to do the same introspection stuff but
> inside a spark job to make sure it is not a problem in the way the jobs are
> run.
> The other idea would be that this is a problem related to ser/de. For
> example if the receiver was being serialized and then deserialized it could
> definitely happen depending on the lib used and its configuration that it
> just doesn't preserve the concrete type. So it would deserialize using the
> compile type instead of the runtime type.
>
> Cheers,
> Eugen
>
>
> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>
>> Thanks for the update Phil.
>>
>> I'm preparing a environment to reproduce it.
>>
>> I keep you posted.
>>
>> Thanks again,
>> Regards
>> JB
>>
>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>>> Not a dumb question, but yes I updated all of the library references to
>>> 1.5, including  (even tried 1.5.1).
>>>
>>> // Versions.spark set elsewhere to "1.5.0"
>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>>> "provided"
>>>
>>> I am experiencing the issue in my own spark project, but also when I try
>>> to run the spark streaming kinesis example that comes in spark/examples
>>>
>>> Tried running the streaming job locally, and also in EMR with release
>>> 4.1.0 that includes Spark 1.5
>>>
>>> Very strange!
>>>
>>> -- Forwarded message --
>>>
>>> From: "Jean-Baptiste Onofré" <j...@nanthrax.net >> j...@nanthrax.net>>
>>> To: user@spark.apache.org <mailto:user@spark.apache.org>
>>>
>>> Cc:
>>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>>> Subject: Re: Spark 1.5 Streaming and Kinesis
>>> Hi Phil,
>>> KinesisReceiver is part of extra. Just a dumb question: did you
>>> update all, including the Spark Kinesis extra containing the
>>> KinesisReceiver ?
>>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>>> see:
>>> blockIdToSeqNumRanges.clear()
>>> which is a:
>>> private val blockIdToSeqNumRanges = new
>>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>>  with mutable.SynchronizedMap[StreamBlockId,
>>> SequenceNumberRanges]
>>> So, it doesn't look fully correct to me.
>>> Let me investigate a bit this morning.
>>> Regards
>>> JB
>>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>> streaming applications, to take advantage of the new Kinesis
>>> checkpointing improvements in 1.5.
>>> However after upgrading, we are consistently seeing the following
>>> error:
>>> java.lang.ClassCastException: scala.collection.mutable.HashMap
>>> cannot be
>>> cast to scala.collection.mutable.SynchronizedMap
>>> at
>>>
>>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>> at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>> at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTrac

Re: map vs foreach for sending data to external system

2015-07-02 Thread Eugen Cepoi
*The thing is that foreach forces materialization of the RDD and it seems
to be executed on the driver program*
What makes you think that? No, foreach is run in the executors
(distributed) and not in the driver.

2015-07-02 18:32 GMT+02:00 Alexandre Rodrigues 
alex.jose.rodrig...@gmail.com:

 Hi Spark devs,

 I'm coding a spark job and at a certain point in execution I need to send
 some data present in an RDD to an external system.

 val myRdd = 

 myRdd.foreach { record =
   sendToWhtv(record)
 }

 The thing is that foreach forces materialization of the RDD and it seems
 to be executed on the driver program, which is not very benefitial in my
 case. So I changed the logic to a Map (mapWithParititons, but it's the
 same).

 val newRdd = myRdd.map { record =
   sendToWhtv(record)
 }
 newRdd.count()

 My understanding is that map is a transformation operation and then I have
 to force materialization by invoking some action (such as count). Is this
 the correct way to do this kind of distributed foreach or is there any
 other function to achieve this that doesn't necessarily imply a data
 transformation or a returned RDD ?


 Thanks,
 Alex




Re: map vs foreach for sending data to external system

2015-07-02 Thread Eugen Cepoi
Heh, an actions or materializaiton, means that it will trigger the
computation over the RDD. A transformation like map, means that it will
create the transformation chain that must be applied on the data, but it is
actually not executed. It is executed only when an action is triggered over
that RDD. That's why you have the impression the map is so fast, actually
it doesn't do anything :)

2015-07-02 18:59 GMT+02:00 Alexandre Rodrigues 
alex.jose.rodrig...@gmail.com:

 Foreach is listed as an action[1]. I guess an *action* just means that it
 forces materialization of the RDD.

 I just noticed much faster executions with map although I don't like the
 map approach. I'll look at it with new eyes if foreach is the way to go.

 [1] – https://spark.apache.org/docs/latest/programming-guide.html#actions

 Thanks guys!




 --
 Alexandre Rodrigues

 On Thu, Jul 2, 2015 at 5:37 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:



 *The thing is that foreach forces materialization of the RDD and it
 seems to be executed on the driver program*
 What makes you think that? No, foreach is run in the executors
 (distributed) and not in the driver.

 2015-07-02 18:32 GMT+02:00 Alexandre Rodrigues 
 alex.jose.rodrig...@gmail.com:

 Hi Spark devs,

 I'm coding a spark job and at a certain point in execution I need to
 send some data present in an RDD to an external system.

 val myRdd = 

 myRdd.foreach { record =
   sendToWhtv(record)
 }

 The thing is that foreach forces materialization of the RDD and it seems
 to be executed on the driver program, which is not very benefitial in my
 case. So I changed the logic to a Map (mapWithParititons, but it's the
 same).

 val newRdd = myRdd.map { record =
   sendToWhtv(record)
 }
 newRdd.count()

 My understanding is that map is a transformation operation and then I
 have to force materialization by invoking some action (such as count). Is
 this the correct way to do this kind of distributed foreach or is there any
 other function to achieve this that doesn't necessarily imply a data
 transformation or a returned RDD ?


 Thanks,
 Alex






Re: Multiple dir support : newApiHadoopFile

2015-06-26 Thread Eugen Cepoi
Comma separated paths works only with spark 1.4 and up

2015-06-26 18:56 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:

 You can comma separate them or use globbing patterns

 2015-06-26 18:54 GMT+02:00 Ted Yu yuzhih...@gmail.com:

 See this related thread:
 http://search-hadoop.com/m/q3RTtiYm8wgHego1

 On Fri, Jun 26, 2015 at 9:43 AM, Bahubali Jain bahub...@gmail.com
 wrote:


 Hi,
 How do we read files from multiple directories using newApiHadoopFile ()
 ?

 Thanks,
 Baahu
 --
 Twitter:http://twitter.com/Baahu






Re: Multiple dir support : newApiHadoopFile

2015-06-26 Thread Eugen Cepoi
You can comma separate them or use globbing patterns

2015-06-26 18:54 GMT+02:00 Ted Yu yuzhih...@gmail.com:

 See this related thread:
 http://search-hadoop.com/m/q3RTtiYm8wgHego1

 On Fri, Jun 26, 2015 at 9:43 AM, Bahubali Jain bahub...@gmail.com wrote:


 Hi,
 How do we read files from multiple directories using newApiHadoopFile () ?

 Thanks,
 Baahu
 --
 Twitter:http://twitter.com/Baahu





Re: What are the likely causes of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle?

2015-06-26 Thread Eugen Cepoi
Are you using yarn?
If yes increase the yarn memory overhead option. Yarn is probably killing
your executors.
Le 26 juin 2015 20:43, XianXing Zhang xianxing.zh...@gmail.com a écrit :

 Do we have any update on this thread? Has anyone met and solved similar
 problems before?

 Any pointers will be greatly appreciated!

 Best,
 XianXing

 On Mon, Jun 15, 2015 at 11:48 PM, Jia Yu jia...@asu.edu wrote:

 Hi Peng,

 I got exactly same error! My shuffle data is also very large. Have you
 figured out a method to solve that?

 Thanks,
 Jia

 On Fri, Apr 24, 2015 at 7:59 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm deploying a Spark data processing job on an EC2 cluster, the job is
 small
 for the cluster (16 cores with 120G RAM in total), the largest RDD has
 only
 76k+ rows. But heavily skewed in the middle (thus requires
 repartitioning)
 and each row has around 100k of data after serialization. The job always
 got
 stuck in repartitioning. Namely, the job will constantly get following
 errors and retries:

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle

 org.apache.spark.shuffle.FetchFailedException: Error in opening
 FileSegmentManagedBuffer

 org.apache.spark.shuffle.FetchFailedException:
 java.io.FileNotFoundException: /tmp/spark-...
 I've tried to identify the problem but it seems like both memory and disk
 consumption of the machine throwing these errors are below 50%. I've also
 tried different configurations, including:

 let driver/executor memory use 60% of total memory.
 let netty to priortize JVM shuffling buffer.
 increase shuffling streaming buffer to 128m.
 use KryoSerializer and max out all buffers
 increase shuffling memoryFraction to 0.4
 But none of them works. The small job always trigger the same series of
 errors and max out retries (upt to 1000 times). How to troubleshoot this
 thing in such situation?

 Thanks a lot if you have any clue.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-are-the-likely-causes-of-org-apache-spark-shuffle-MetadataFetchFailedException-Missing-an-outpu-tp22646.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
Hey,

I am not 100% sure but from my understanding accumulators are per partition
(so per task as its the same) and are sent back to the driver with the task
result and merged. When a task needs to be run n times (multiple rdds
depend on this one, some partition loss later in the chain etc) then the
accumulator will count n times the values from that task.
So in short I don't think you'd win from using an accumulator over what you
are doing right now.

You could maybe coalesce your rdd to num-executors without a shuffle and
then update the sketches. You should endup with 1 partition per executor
thus 1 sketch per executor. You could then increase the number of threads
per task if you can use the sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of array
 of ints. To distribute it, all sketches must have the same size. Since it
 can be big, and since merging is not free, I would like to minimize the
 number of sketches and maximize reuse and conccurent use of the sketches.
 Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it seems
 that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks complete.
 After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local, so
 I assume each task have a unique thread on executors)

 If I understand correctly, each time a task starts, an accumulator is
 initialized locally, updated, then sent back to the driver for merging ?

 So I guess, accumulators may not be the way to go, finally.

 Any advice ?
 Guillaume
 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
BTW I suggest this instead of using thread locals as I am not sure in which
situation spark will reuse or not them. For example if an error happens
inside a thread, will spark then create a new one or the error is catched
inside the thread preventing it to stop. So in short, does spark guarantee
that the threads are being started at the begining and will last until the
end of the jvm.

2015-06-18 15:32 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:



 2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  I was thinking exactly the same. I'm going to try it, It doesn't really
 matter if I lose an executor, since its sketch will be lost, but then
 reexecuted somewhere else.


 I mean that between the action that will update the sketches and the
 action to collect/merge them you can loose an executor. So outside of
 sparks control. But it's probably an acceptable risk.


 And anyway, it's an approximate data structure, and what matters are
 ratios, not exact values.

 I mostly need to take care of the concurrency problem for my sketch.


 I think you could do something like:
   - Have this singleton that holds N sketch instances (one for each
 executor core)
   - Inside an operation over partitions (like
 forEachPartition/mapPartitions)
 - at the begin you ask the singleton to provide you with one instance
 to use, in a sync. fashion and pick it out from the N available instances
 or mark it as in use
 - when the iterator over the partition doesn't have more elements then
 you release this sketch
   - Then you can do something like
 sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla),
 but you will have to make sure that this will be executed over each
 executor (not sure if a dataset  than executor num, will trigger an action
 on every executor)

 Please let me know what you end up doing, sounds interesting :)

 Eugen



 Guillaume

   Yeah thats the problem. There is probably some perfect num of
 partitions that provides the best balance between partition size and memory
 and merge overhead. Though it's not an ideal solution :(

  There could be another way but very hacky... for example if you store
 one sketch in a singleton per jvm (so per executor). Do a first pass over
 your data and update those. Then you trigger some other dummy operation
 that will just retrieve the sketches.
 Thats kind of a hack but should work.

  Note that if you loose an executor in between, then that doesn't work
 anymore, probably you could detect it and recompute the sketches, but it
 would become over complicated.



 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 Thank you for this confirmation.

 Coalescing is what we do now. It creates, however, very big partitions.

 Guillaume

   Hey,

 I am not 100% sure but from my understanding accumulators are per
 partition (so per task as its the same) and are sent back to the driver
 with the task result and merged. When a task needs to be run n times
 (multiple rdds depend on this one, some partition loss later in the chain
 etc) then the accumulator will count n times the values from that task.
  So in short I don't think you'd win from using an accumulator over what
 you are doing right now.

  You could maybe coalesce your rdd to num-executors without a shuffle
 and then update the sketches. You should endup with 1 partition per
 executor thus 1 sketch per executor. You could then increase the number of
 threads per task if you can use the sketches concurrently.

  Eugen

 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of
 array of ints. To distribute it, all sketches must have the same size.
 Since it can be big, and since merging is not free, I would like to
 minimize the number of sketches and maximize reuse and conccurent use of
 the sketches. Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it
 seems that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks
 complete. After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local,
 so I assume each task have

Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
Yeah thats the problem. There is probably some perfect num of partitions
that provides the best balance between partition size and memory and merge
overhead. Though it's not an ideal solution :(

There could be another way but very hacky... for example if you store one
sketch in a singleton per jvm (so per executor). Do a first pass over your
data and update those. Then you trigger some other dummy operation that
will just retrieve the sketches.
Thats kind of a hack but should work.

Note that if you loose an executor in between, then that doesn't work
anymore, probably you could detect it and recompute the sketches, but it
would become over complicated.



2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 Thank you for this confirmation.

 Coalescing is what we do now. It creates, however, very big partitions.

 Guillaume

   Hey,

 I am not 100% sure but from my understanding accumulators are per
 partition (so per task as its the same) and are sent back to the driver
 with the task result and merged. When a task needs to be run n times
 (multiple rdds depend on this one, some partition loss later in the chain
 etc) then the accumulator will count n times the values from that task.
  So in short I don't think you'd win from using an accumulator over what
 you are doing right now.

  You could maybe coalesce your rdd to num-executors without a shuffle and
 then update the sketches. You should endup with 1 partition per executor
 thus 1 sketch per executor. You could then increase the number of threads
 per task if you can use the sketches concurrently.

  Eugen

 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of array
 of ints. To distribute it, all sketches must have the same size. Since it
 can be big, and since merging is not free, I would like to minimize the
 number of sketches and maximize reuse and conccurent use of the sketches.
 Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it seems
 that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks
 complete. After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local,
 so I assume each task have a unique thread on executors)

 If I understand correctly, each time a task starts, an accumulator is
 initialized locally, updated, then sent back to the driver for merging ?

 So I guess, accumulators may not be the way to go, finally.

 Any advice ?
 Guillaume
 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705




 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  I was thinking exactly the same. I'm going to try it, It doesn't really
 matter if I lose an executor, since its sketch will be lost, but then
 reexecuted somewhere else.


I mean that between the action that will update the sketches and the action
to collect/merge them you can loose an executor. So outside of sparks
control. But it's probably an acceptable risk.


 And anyway, it's an approximate data structure, and what matters are
 ratios, not exact values.

 I mostly need to take care of the concurrency problem for my sketch.


I think you could do something like:
  - Have this singleton that holds N sketch instances (one for each
executor core)
  - Inside an operation over partitions (like
forEachPartition/mapPartitions)
- at the begin you ask the singleton to provide you with one instance
to use, in a sync. fashion and pick it out from the N available instances
or mark it as in use
- when the iterator over the partition doesn't have more elements then
you release this sketch
  - Then you can do something like
sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla),
but you will have to make sure that this will be executed over each
executor (not sure if a dataset  than executor num, will trigger an action
on every executor)

Please let me know what you end up doing, sounds interesting :)

Eugen



 Guillaume

   Yeah thats the problem. There is probably some perfect num of
 partitions that provides the best balance between partition size and memory
 and merge overhead. Though it's not an ideal solution :(

  There could be another way but very hacky... for example if you store one
 sketch in a singleton per jvm (so per executor). Do a first pass over your
 data and update those. Then you trigger some other dummy operation that
 will just retrieve the sketches.
 Thats kind of a hack but should work.

  Note that if you loose an executor in between, then that doesn't work
 anymore, probably you could detect it and recompute the sketches, but it
 would become over complicated.



 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 Thank you for this confirmation.

 Coalescing is what we do now. It creates, however, very big partitions.

 Guillaume

   Hey,

 I am not 100% sure but from my understanding accumulators are per
 partition (so per task as its the same) and are sent back to the driver
 with the task result and merged. When a task needs to be run n times
 (multiple rdds depend on this one, some partition loss later in the chain
 etc) then the accumulator will count n times the values from that task.
  So in short I don't think you'd win from using an accumulator over what
 you are doing right now.

  You could maybe coalesce your rdd to num-executors without a shuffle and
 then update the sketches. You should endup with 1 partition per executor
 thus 1 sketch per executor. You could then increase the number of threads
 per task if you can use the sketches concurrently.

  Eugen

 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of array
 of ints. To distribute it, all sketches must have the same size. Since it
 can be big, and since merging is not free, I would like to minimize the
 number of sketches and maximize reuse and conccurent use of the sketches.
 Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it
 seems that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks
 complete. After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local,
 so I assume each task have a unique thread on executors)

 If I understand correctly, each time a task starts, an accumulator is
 initialized locally, updated, then sent back to the driver for merging ?

 So I guess, accumulators may not be the way to go, finally.

 Any advice ?
 Guillaume
 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705




 --
 

Re: Intermedate stage will be cached automatically ?

2015-06-17 Thread Eugen Cepoi
Cache is more general. ReduceByKey involves a shuffle step where the data
will be in memory and on disk (for what doesn't hold in memory). The
shuffle files will remain around until the end of the job. The blocks from
memory will be dropped if memory is needed for other things. This is an
optimisation so other rdds that depend on the result of this shuffle don't
have to go through all the chain. They just fetch the shuffle blocks from
memory/disk.

Calling cache in this example gives near the same result (I guess there are
some impl. specific differences). But if there wasn't a shuffle step then
cache would explicitly persist this dataset, however not on disk except if
you say it to.

Eugen

2015-06-17 15:10 GMT+02:00 canan chen ccn...@gmail.com:

 Yes, actually on the storage ui, there's no data cached. But the behavior
 confuse me. If I call the cache method as following the behavior is the
 same as without calling cache method, why's that ?


 val data = sc.parallelize(1 to 10, 2).map(e=(e%2,2)).reduceByKey(_ + _, 2)
 data.cache()
 println(data.count())
 println(data.count())



 On Wed, Jun 17, 2015 at 8:45 PM, ayan guha guha.a...@gmail.com wrote:

 Its not cached per se. For example, you will not see this in Storage tab
 in UI. However, spark has read the data and its in memory right now. So,
 the next count call should be very fast.


 Best
 Ayan

 On Wed, Jun 17, 2015 at 10:21 PM, Mark Tse mark@d2l.com wrote:

  I think
 https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
 might shed some light on the behaviour you’re seeing.



 Mark



 *From:* canan chen [mailto:ccn...@gmail.com]
 *Sent:* June-17-15 5:57 AM
 *To:* spark users
 *Subject:* Intermedate stage will be cached automatically ?



 Here's one simple spark example that I call RDD#count 2 times. The first
 time it would invoke 2 stages, but the second one only need 1 stage. Seems
 the first stage is cached. Is that true ? Any flag can I control whether
 the cache the intermediate stage


 *val *data = sc.parallelize(1 to 10, 2).map(e=(e%2,2)).reduceByKey(_ + 
 _, 2)
 *println*(data.count())
 *println*(data.count())




 --
 Best Regards,
 Ayan Guha





Re: Spark on EMR

2015-06-17 Thread Eugen Cepoi
It looks like it is a wrapper around
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark
So basically adding an option -v,1.4.0.a should work.

https://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-spark-configure.html

2015-06-17 15:32 GMT+02:00 Hideyoshi Maeda hideyoshi.ma...@gmail.com:

 Any ideas what version of Spark is underneath?

 i.e. is it 1.4? and is SparkR supported on Amazon EMR?

 On Wed, Jun 17, 2015 at 12:06 AM, ayan guha guha.a...@gmail.com wrote:

 That's great news. Can I assume spark on EMR supports kinesis to hbase
 pipeline?
 On 17 Jun 2015 05:29, kamatsuoka ken...@gmail.com wrote:

 Spark is now officially supported on Amazon Elastic Map Reduce:
 http://aws.amazon.com/elasticmapreduce/details/spark/



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-EMR-tp23343.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to set KryoRegistrator class in spark-shell

2015-06-11 Thread Eugen Cepoi
Or launch the spark-shell with --conf spark.kryo.registrator=foo.bar.MyClass

2015-06-11 14:30 GMT+02:00 Igor Berman igor.ber...@gmail.com:

 Another option would be to close sc and open new context with your custom
 configuration
 On Jun 11, 2015 01:17, bhomass bhom...@gmail.com wrote:

 you need to register using spark-default.xml as explained here


 https://books.google.com/books?id=WE_GBwAAQBAJpg=PA239lpg=PA239dq=spark+shell+register+kryo+serializationsource=blots=vCxgEfz1-2sig=dHU8FY81zVoBqYIJbCFuRwyFjAwhl=ensa=Xved=0CEwQ6AEwB2oVChMIn_iujpCGxgIVDZmICh3kYADW#v=onepageq=spark%20shell%20register%20kryo%20serializationf=false



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-KryoRegistrator-class-in-spark-shell-tp12498p23265.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread Eugen Cepoi
Hi

2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = getDirSize(inputPaths.get(0));

 for (int i = 1; i  inputs.size(); ++i) {
 dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro));
 totalSize += getDirSize(inputPaths.get(i));
 }

 int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
 DataFrame outputFrame;

 // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
 // the synchronize block below. Suggestions welcome here too! :-)
 synchronized (this) {
 RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
 null);
 outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
 }

 outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

 Here are some things bothering me:

- Conversion to an RDD and back again so that we can use coalesce() to
reduce the number of partitions. This is because we read that repartition()
is not as efficient as coalesce(), and local micro benchmarks seemed to
somewhat confirm that this was faster. Is this really a good idea though?
Should we be doing something else?

 Repartition uses coalesce but with a forced shuffle step. Its just a
shortcut for coalesce(xxx, true)
Doing a coalesce sounds correct, I'd do the same :) Note that if you add
the shuffle step, then your partitions should be better balanced.


- Usage of unionAll() - this is the only way I could find to join the
separate data sets into a single data frame to save as Parquet. Is there a
better way?

 When using directly the inputformats you can do this
FileInputFormat.addInputPath, it should perform at least as good as union.


- Do I need to be using the DataFrame API at all? I'm not querying any
data here, so the nice API for SQL-like transformations of the data isn't
being used. The DataFrame API just seemed like the path of least resistance
for working with Avro and Parquet. Would there be any advantage to using
hadoopRDD() with the appropriate Input/Output formats?



Using directly the input/outputformats sounds viable. But the snippet you
show seems clean enough and I am not sure there would be much value in
making something (maybe) slightly faster but harder to understand.


Eugen

Any advice or tips greatly appreciated!


 James.





Re: How to give multiple directories as input ?

2015-05-27 Thread Eugen Cepoi
Try something like that:


 def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
Date, endDate: Date) = {

   // assuming a list of paths

   val paths: Seq[String] = getInputPaths(inputDir, startDate, endDate)

   val job = Job.getInstance(new Configuration(sc.hadoopConfiguration))

   paths.drop(1).foreach(p = FileInputFormat.addInputPath(job, new
Path(p)))

   sc.newAPIHadoopFile(paths.head,
classOf[AvroKeyInputFormat[GenericRecord]], classOf[NullWritable],
classOf[GenericRecord], job.getConfiguration())

  }

2015-05-27 10:55 GMT+02:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:


  def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
 Date, endDate: Date) = {

 val path = getInputPaths(inputDir, startDate, endDate)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
 AvroKeyInputFormat[GenericRecord]](/A/B/C/D/D/2015/05/22/out-r-*.avro)

   }


 This is my method, can you show me where should i modify to use
 FileInputFormat ? If you add the path there what should you give while
 invoking newAPIHadoopFile

 On Wed, May 27, 2015 at 2:20 PM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 You can do that using FileInputFormat.addInputPath

 2015-05-27 10:41 GMT+02:00 ayan guha guha.a...@gmail.com:

 What about /blah/*/blah/out*.avro?
 On 27 May 2015 18:08, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I am doing that now.
 Is there no other way ?

 On Wed, May 27, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 How about creating two and union [ sc.union(first, second) ] them?

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have this piece

 sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
 AvroKeyInputFormat[GenericRecord]](
 /a/b/c/d/exptsession/2015/05/22/out-r-*.avro)

 that takes (/a/b/c/d/exptsession/2015/05/22/out-r-*.avro) this as
 input.

 I want to give a second directory as input but this is a invalid
 syntax


 that takes (/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
 /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro)

 OR

 (/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
 /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro)


 Please suggest.



 --
 Deepak





 --
 Deepak





 --
 Deepak




Re: How to give multiple directories as input ?

2015-05-27 Thread Eugen Cepoi
You can do that using FileInputFormat.addInputPath

2015-05-27 10:41 GMT+02:00 ayan guha guha.a...@gmail.com:

 What about /blah/*/blah/out*.avro?
 On 27 May 2015 18:08, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I am doing that now.
 Is there no other way ?

 On Wed, May 27, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 How about creating two and union [ sc.union(first, second) ] them?

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have this piece

 sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
 AvroKeyInputFormat[GenericRecord]](
 /a/b/c/d/exptsession/2015/05/22/out-r-*.avro)

 that takes (/a/b/c/d/exptsession/2015/05/22/out-r-*.avro) this as
 input.

 I want to give a second directory as input but this is a invalid syntax


 that takes (/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
 /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro)

 OR

 (/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
 /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro)


 Please suggest.



 --
 Deepak





 --
 Deepak




Re: Questions about Accumulators

2015-05-03 Thread Eugen Cepoi
Yes that's it. If a partition is lost, to recompute it, some steps will
need to be re-executed. Perhaps the map function in which you update the
accumulator.

I think you can do it more safely in a transformation near the action,
where it is less likely that an error will occur (not always true...). You
can also checkpoint the RDD after the step that updates the accumulator, so
your transformation doesn't get applied again if some task fails. But this
is kind of expensive considering you only want to update some counter...

Another idea could be to implement a custom accumulator that holds a map of
partition index - value and then on driver side merge the values in the
map, but I never tried this not sure if it would really work.

Cheers,
Eugen

2015-05-03 14:08 GMT+02:00 xiazhuchang hk8...@163.com:

 The official document said  In transformations, users should be aware of
 that each task’s update may be applied more than once if tasks or job
 stages
 are re-executed.
 I don't quite understand what is this mean. is that meas if i use the
 accumulator in transformations(i.e. map() operation), this operation will
 be
 execuated more than once if the task restarte? And then the final result
 will be many times of the real result?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Questions-about-Accumulators-tp22746.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Multipart upload to S3 fails with Bad Digest Exceptions

2015-04-13 Thread Eugen Cepoi
I think I found where the problem comes from.

I am writing lzo compressed thrift records using elephant-bird, my guess is
that perhaps one side is computing the checksum based on the uncompressed
data and the other on the compressed data, thus getting a mismatch.

When writing the data as strings using a plain TextOutputFormat, the multi
part upload works, this confirms that the lzo compression is probably the
problem... but it is not a solution :(

2015-04-13 18:46 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hi,

 I am not sure my problem is relevant to spark, but perhaps someone else
 had the same error. When I try to write files that need multipart upload to
 S3 from a job on EMR I always get this error:

 com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you
 specified did not match what we received.

 If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or
 output smaller files that don't require multi part upload), then everything
 works fine.

 I've seen an old thread on the ML where someone has the same error, but in
 my case I don't have any other errors on the worker nodes.

 I am using spark 1.2.1 and hadoop 2.4.0.

 Thanks,
 Eugen



Multipart upload to S3 fails with Bad Digest Exceptions

2015-04-13 Thread Eugen Cepoi
Hi,

I am not sure my problem is relevant to spark, but perhaps someone else had
the same error. When I try to write files that need multipart upload to S3
from a job on EMR I always get this error:

com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you
specified did not match what we received.

If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or
output smaller files that don't require multi part upload), then everything
works fine.

I've seen an old thread on the ML where someone has the same error, but in
my case I don't have any other errors on the worker nodes.

I am using spark 1.2.1 and hadoop 2.4.0.

Thanks,
Eugen


Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-18 Thread Eugen Cepoi
Hey Dimitriy, thanks for sharing your solution.

I have some more updates.

The problem comes out when shuffle is involved. Using coalesce shuffle true
behaves like reduceByKey+smaller num of partitions, except that the whole
save stage hangs. I am not sure yet if it only happens with UnionRDD or
also for cogroup like.

Changing spark.shuffle.blockTransferService to use nio (default pre 1.2)
solves the problem.
So it looks like this problem arises with the new netty based impl.




2015-03-18 1:26 GMT+01:00 Dmitriy Lyubimov dlie...@gmail.com:

 FWIW observed similar behavior in similar situation. Was able to work
 around by forcefully committing one of the rdds right before the union
 into cache, and forcing that by executing take(1). Nothing else ever
 helped.

 Seems like yet-undiscovered 1.2.x thing.

 On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:
  Doing the reduceByKey without changing the number of partitions and then
 do
  a coalesce works.
  But the other version still hangs, without any information (while working
  with spark 1.1.1). The previous logs don't seem to be related to what
  happens.
  I don't think this is a memory issue as the GC time remains low and the
  shuffle read is small. My guess is that it might be related to a high
 number
  of initial partitions, but in that case shouldn't it fail for coalesce
  too...?
 
  Does anyone have an idea where to look at to find what the source of the
  problem is?
 
  Thanks,
  Eugen
 
  2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:
 
  Hum increased it to 1024 but doesn't help still have the same problem :(
 
  2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:
 
  The one by default 0.07 of executor memory. I'll try increasing it and
  post back the result.
 
  Thanks
 
  2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:
 
  Might be related: what's the value for
  spark.yarn.executor.memoryOverhead ?
 
  See SPARK-6085
 
  Cheers
 
  On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
  wrote:
 
  Hi,
 
  I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
  Strange thing, the exact same code does work (after upgrade) in the
  spark-shell. But this information might be misleading as it works
 with
  1.1.1...
 
 
  The job takes as input two data sets:
   - rdd A of +170gb (with less it is hard to reproduce) and more than
  11K partitions
   - rdd B of +100mb and 32 partitions
 
  I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
  not sure the executor config is relevant here. Anyway I tried with
 multiple
  small executors with fewer ram and the inverse.
 
 
  The job basically does this:
  A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save
 
  After the flatMap rdd A size is much smaller similar to B.
 
  Configs I used to run this job:
 
  storage.memoryFraction: 0
  shuffle.memoryFraction: 0.5
 
  akka.timeout 500
  akka.frameSize 40
 
  // this one defines also the memory used by yarn master, but not sure
  if it needs to be important
  driver.memory 5g
  excutor.memory 4250m
 
  I have 7 executors with 2 cores.
 
  What happens:
  The job produces two stages: keyBy and save. The keyBy stage runs
 fine
  and produces a shuffle write of ~150mb. The save stage where the
 suffle read
  occurs hangs. Greater the initial dataset is more tasks hang.
 
  I did run it for much larger datasets with same config/cluster but
  without doing the union and it worked fine.
 
  Some more infos and logs:
 
  Amongst 4 nodes 1 finished all his tasks and the running ones are
 on
  the 3 other nodes. But not sure this is a good information (one node
 that
  completed all his work vs the others) as with some smaller dataset I
 manage
  to get only one hanging task.
 
  Here are the last parts of the executor logs that show some timeouts.
 
  An executor from node ip-10-182-98-220
 
  15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
  remote fetches in 66 ms
  15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
  connection from /10.181.48.153:56806
  java.io.IOException: Connection timed out
 
 
  An executor from node ip-10-181-103-186
 
  15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
  remote fetches in 20 ms
  15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
  connection from /10.182.98.220:38784
  java.io.IOException: Connection timed out
 
  An executor from node ip-10-181-48-153 (all the logs bellow belong
 this
  node)
 
  15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
  1.0 (TID 13860). 802 bytes result sent to driver
  15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
  connection from /10.181.103.186:46381
  java.io.IOException: Connection timed out
 
  Followed by many
 
  15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
  result
 ChunkFetchSuccess{streamChunkId=StreamChunkId

Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-17 Thread Eugen Cepoi
Doing the reduceByKey without changing the number of partitions and then do
a coalesce works.
But the other version still hangs, without any information (while working
with spark 1.1.1). The previous logs don't seem to be related to what
happens.
I don't think this is a memory issue as the GC time remains low and the
shuffle read is small. My guess is that it might be related to a high
number of initial partitions, but in that case shouldn't it fail for
coalesce too...?

Does anyone have an idea where to look at to find what the source of the
problem is?

Thanks,
Eugen

2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hum increased it to 1024 but doesn't help still have the same problem :(

 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 The one by default 0.07 of executor memory. I'll try increasing it and
 post back the result.

 Thanks

 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value
 for spark.yarn.executor.memoryOverhead ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than
 11K partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
 not sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure
 if it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle
 read occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show
 anything special.

 Note that I don't cache anything thus reduced the
 storage.memoryFraction to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 
 0.0 B.


 Sorry

Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Eugen Cepoi
Hum increased it to 1024 but doesn't help still have the same problem :(

2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 The one by default 0.07 of executor memory. I'll try increasing it and
 post back the result.

 Thanks

 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value for spark.yarn.executor.memoryOverhead
 ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
 partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
 not sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure if
 it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle
 read occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show
 anything special.

 Note that I don't cache anything thus reduced the storage.memoryFraction
 to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 
 0.0 B.


 Sorry for the long email with maybe misleading infos, but I hope it
 might help to track down what happens and why it was working with spark
 1.1.1.

 Thanks,
 Eugen






Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Eugen Cepoi
The one by default 0.07 of executor memory. I'll try increasing it and post
back the result.

Thanks

2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value for spark.yarn.executor.memoryOverhead ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
 partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not
 sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure if
 it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle
 read occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote 
 fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote 
 fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result 
 RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show anything
 special.

 Note that I don't cache anything thus reduced the storage.memoryFraction
 to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 
 B.


 Sorry for the long email with maybe misleading infos, but I hope it might
 help to track down what happens and why it was working with spark 1.1.1.

 Thanks,
 Eugen





Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Eugen Cepoi
Hi,

I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange
thing, the exact same code does work (after upgrade) in the spark-shell.
But this information might be misleading as it works with 1.1.1...


*The job takes as input two data sets:*
 - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
partitions
 - rdd B of +100mb and 32 partitions

I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not
sure the executor config is relevant here. Anyway I tried with multiple
small executors with fewer ram and the inverse.


*The job basically does this:*
A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

After the flatMap rdd A size is much smaller similar to B.

*Configs I used to run this job:*

storage.memoryFraction: 0
shuffle.memoryFraction: 0.5

akka.timeout 500
akka.frameSize 40

// this one defines also the memory used by yarn master, but not sure if it
needs to be important
driver.memory 5g
excutor.memory 4250m

I have 7 executors with 2 cores.

*What happens:*
The job produces two stages: keyBy and save. The keyBy stage runs fine and
produces a shuffle write of ~150mb. The save stage where the suffle read
occurs hangs. Greater the initial dataset is more tasks hang.

I did run it for much larger datasets with same config/cluster but without
doing the union and it worked fine.

*Some more infos and logs:*

Amongst 4 nodes 1 finished all his tasks and the running ones are on the
3 other nodes. But not sure this is a good information (one node that
completed all his work vs the others) as with some smaller dataset I manage
to get only one hanging task.

Here are the last parts of the executor logs that show some timeouts.

*An executor from node ip-10-182-98-220*

15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
remote fetches in 66 ms
15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
connection from /10.181.48.153:56806
java.io.IOException: Connection timed out


*An executor from node ip-10-181-103-186*

15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
remote fetches in 20 ms
15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
connection from /10.182.98.220:38784
java.io.IOException: Connection timed out

*An executor from node ip-10-181-48-153* (all the logs bellow belong this node)

15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
1.0 (TID 13860). 802 bytes result sent to driver
15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
connection from /10.181.103.186:46381
java.io.IOException: Connection timed out

*Followed by many *

15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016,
chunkIndex=405},
buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
offset=8631, length=571}} to /10.181.103.186:46381; closing connection
java.nio.channels.ClosedChannelException

*with last one being*

15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
result RpcResponse{requestId=7377187355282895939,
response=[B@6fcd0014} to /10.181.103.186:46381; closing connection
java.nio.channels.ClosedChannelException


The executors from the node that finished his tasks doesn't show anything
special.

Note that I don't cache anything thus reduced the storage.memoryFraction to
0.
I see some of those, but don't think they are related.

15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B
(blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B.
Storage limit = 0.0 B.


Sorry for the long email with maybe misleading infos, but I hope it might
help to track down what happens and why it was working with spark 1.1.1.

Thanks,
Eugen


Re: How to design a long live spark application

2015-02-05 Thread Eugen Cepoi
Yes you can submit multiple actions from different threads to the same
SparkContext. It is safe.
Indeed what you want to achieve is quite common. Expose some operations
over a SparkContext through HTTP.
I have used spray for this and it just worked fine.

At bootstrap of your web app, start a sparkcontext, maybe preprocess some
data and cache it, then start accepting requests against this sc. Depending
where you place the initialization code, you can block the server from
initializing until your context is ready. This is nice if you don't want to
accept requests while the context is being prepared.


Eugen


2015-02-05 23:22 GMT+01:00 Shuai Zheng szheng.c...@gmail.com:

 This example helps a lot J



 But I am thinking a below case:



 Assume I have a SparkContext as a global variable.

 Then if I use multiple threads to access/use it. Will it mess up?



 For example:



 My code:



 *public* *static* ListTuple2Integer, Double run(JavaSparkContext
 sparkContext, MapInteger, ListExposureInfo cache, Properties prop,
 ListEghInfo el)

  *throws* IOException, InterruptedException {

 JavaRDDEghInfo lines = sparkContext.parallelize(el, 100);

 Lines.map(…)

 …

 Lines.count()

 }



 If I have two threads call this method at the same time and pass in the
 same SparkContext.



 Will SparkContext be thread-safe? I am a bit worry here, in traditional
 java, it should be, but in Spark context, I am not 100% sure.



 Basically the sparkContext need to smart enough to differentiate the
 different method context (RDD add to it from different methods), so create
 two different DAG for different method.



 Anyone can confirm this? This is not something I can easily test with
 code. Thanks!



 Regards,



 Shuai



 *From:* Corey Nolet [mailto:cjno...@gmail.com]
 *Sent:* Thursday, February 05, 2015 11:55 AM
 *To:* Charles Feduke
 *Cc:* Shuai Zheng; user@spark.apache.org
 *Subject:* Re: How to design a long live spark application



 Here's another lightweight example of running a SparkContext in a common
 java servlet container: https://github.com/calrissian/spark-jetty-server



 On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com
 wrote:

 If you want to design something like Spark shell have a look at:



 http://zeppelin-project.org/



 Its open source and may already do what you need. If not, its source code
 will be helpful in answering the questions about how to integrate with long
 running jobs that you have.



 On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com
 wrote:

 You can check out https://github.com/spark-jobserver/spark-jobserver -
 this allows several users to upload their jars and run jobs with a REST
 interface.



 However, if all users are using the same functionality, you can write a
 simple spray server which will act as the driver and hosts the spark
 context+RDDs, launched in client mode.



 On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Hi All,



 I want to develop a server side application:



 User submit request à Server run spark application and return (this might
 take a few seconds).



 So I want to host the server to keep the long-live context, I don’t know
 whether this is reasonable or not.



 Basically I try to have a global JavaSparkContext instance and keep it
 there, and initialize some RDD. Then my java application will use it to
 submit the job.



 So now I have some questions:



 1, if I don’t close it, will there any timeout I need to configure on the
 spark server?

 2, In theory I want to design something similar to Spark shell (which also
 host a default sc there), just it is not shell based.



 Any suggestion? I think my request is very common for application
 development, here must someone has done it before?



 Regards,



 Shawn







Re: application as a service

2014-08-17 Thread Eugen Cepoi
Hi,

You can achieve it by running a spray service for example that has access
to the RDD in question. When starting the app you first build your RDD and
cache it. In your spray endpoints you will translate the HTTP requests to
operations on that RDD.


2014-08-17 17:27 GMT+02:00 Zhanfeng Huo huozhanf...@gmail.com:

 Hi, All:

 I have a demand that using spark load business data daily and cache
 it as rdd or spark sql rdd.  And  other users can query base on it (in
 memery).  As a summary, it requires that the app must runing as a deamon
 service that can last for one day  at least and  user's app can access it's
 rdd.

How can I achieve this require ?

Thanks.
 --
 Zhanfeng Huo



Re: collect() on small group of Avro files causes plain NullPointerException

2014-07-22 Thread Eugen Cepoi
Do you have a list/array in your avro record? If yes this could cause the
problem. I experienced this kind of problem and solved it by providing
custom kryo ser/de for avro lists. Also be carefull spark reuses records,
so if you just read and then don't copy/transform them you would end up
with the records having same values.


2014-07-22 15:01 GMT+02:00 Sparky gullo_tho...@bah.com:

 Running a simple collect method on a group of Avro objects causes a plain
 NullPointerException.  Does anyone know what may be wrong?

 files.collect()

 Press ENTER or type command to continue
 Exception in thread Executor task launch worker-0
 java.lang.NullPointerException
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
 at scala.Option.flatMap(Option.scala:170)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Using Spark as web app backend

2014-06-25 Thread Eugen Cepoi
Yeah I agree with Koert, it would be the lightest solution. I have
used it quite successfully and it just works.

There is not much spark specifics here, you can follow this example
https://github.com/jacobus/s4 on how to build your spray service.
Then the easy solution would be to have a SparkContext in your
HttpService, this context is being initialized at bootstrap,
computes the RDD you want to run the queries on and caches them. In
your routes, you will query the cached RDDs.

In my case I used spark+spray a bit differently for a always running
service, as I didn't want to block the resources for always.
The app at bootstrap was starting a spark job that fetches data and
preprocesses/precomputes an optimized structure
(domain specific indexes) that is collected locally and then reused
across requests directly from RAM,
the spark context is stopped when the job is done. Only the service
continues to run.


Eugen


2014-06-25 9:07 GMT+02:00 Jaonary Rabarisoa jaon...@gmail.com:

 Hi all,

 Thank you for the reply. Is there any example of spark running in client
 mode with spray ? I think, I will choose this approach.


 On Tue, Jun 24, 2014 at 4:55 PM, Koert Kuipers ko...@tresata.com wrote:

 run your spark app in client mode together with a spray rest service,
 that the front end can talk to


 On Tue, Jun 24, 2014 at 3:12 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 So far, I run my spark jobs with spark-shell or spark-submit command.
 I'd like to go further and I wonder how to use spark as a backend of a web
 application. Specificaly, I want a frontend application ( build with nodejs
 )  to communicate with spark on the backend, so that every query from the
 frontend is rooted to spark. And the result from Spark are sent back to the
 frontend.
 Does some of you already experiment this kind of architecture ?


 Cheers,


 Jaonary






Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Eugen Cepoi
Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit :

 Hello Andrew,

 i wish I could share the code, but for proprietary reasons I can't. But I
can give some idea though of what i am trying to do. The job reads a file
and for each line of that file and processors these lines. I am not doing
anything intense in the processLogs function

 import argonaut._
 import argonaut.Argonaut._


 /* all of these case classes are created from json strings extracted from
the line in the processLogs() function
 *
 */
 case class struct1…
 case class struct2…
 case class value1(struct1, struct2)

 def processLogs(line:String): Option[(key1, value1)] {…
 }

 def run(sparkMaster, appName, executorMemory, jarsPath) {
   val sparkConf = new SparkConf()
sparkConf.setMaster(sparkMaster)
sparkConf.setAppName(appName)
sparkConf.set(spark.executor.memory, executorMemory)
 sparkConf.setJars(jarsPath) // This includes all the jars relevant
jars..
val sc = new SparkContext(sparkConf)
   val rawLogs = sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt)

rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting)

rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt)
 }

 If I switch to local mode, the code runs just fine, it fails with the
error I pasted above. In the cluster mode, even writing back the file we
just read fails
(rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting)

 I still believe this is a classNotFound error in disguise


Indeed you are right, this can be the reason. I had similar errors when
defining case classes in the shell and trying to use them in the RDDs. Are
you shading argonaut in the fat jar ?

 Thanks
 Shivani



 On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com wrote:

 Wait, so the file only has four lines and the job running out of heap
space?  Can you share the code you're running that does the processing?
 I'd guess that you're doing some intense processing on every line but just
writing parsed case classes back to disk sounds very lightweight.

 I


 On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com
wrote:

 I am trying to process a file that contains 4 log lines (not very long)
and then write my parsed out case classes to a destination folder, and I
get the following error:


 java.lang.OutOfMemoryError: Java heap space

 at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)

 at
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)

 at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)

 at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)

 at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

 at java.lang.reflect.Method.invoke(Method.java:597)

 at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)

 at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

 at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)

 at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

 at java.lang.reflect.Method.invoke(Method.java:597)

 at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)


 Sadly, there are several folks that have faced this error while trying
to execute Spark jobs and there are various solutions, none 

Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Eugen Cepoi
In my case it was due to a case class I was defining in the spark-shell and
not being available on the workers. So packaging it in a jar and adding it
with ADD_JARS solved the problem. Note that I don't exactly remember if it
was an out of heap space exception or pergmen space. Make sure your
jarsPath is correct.

Usually to debug this kind of problems I am using the spark-shell (you can
do the same in your job but its more time constuming to repackage, deploy,
run, iterate). Try for example
1) read the lines (without any processing) and count them
2) apply processing and count



2014-06-20 17:15 GMT+02:00 Shivani Rao raoshiv...@gmail.com:

 Hello Abhi, I did try that and it did not work

 And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So
 how did you overcome this problem?

 Shivani


 On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:


 Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit :

 
  Hello Andrew,
 
  i wish I could share the code, but for proprietary reasons I can't. But
 I can give some idea though of what i am trying to do. The job reads a file
 and for each line of that file and processors these lines. I am not doing
 anything intense in the processLogs function
 
  import argonaut._
  import argonaut.Argonaut._
 
 
  /* all of these case classes are created from json strings extracted
 from the line in the processLogs() function
  *
  */
  case class struct1…
  case class struct2…
  case class value1(struct1, struct2)
 
  def processLogs(line:String): Option[(key1, value1)] {…
  }
 
  def run(sparkMaster, appName, executorMemory, jarsPath) {
val sparkConf = new SparkConf()
 sparkConf.setMaster(sparkMaster)
 sparkConf.setAppName(appName)
 sparkConf.set(spark.executor.memory, executorMemory)
  sparkConf.setJars(jarsPath) // This includes all the jars relevant
 jars..
 val sc = new SparkContext(sparkConf)
val rawLogs =
 sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt)
 
 rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting)
 
 rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt)
  }
 
  If I switch to local mode, the code runs just fine, it fails with the
 error I pasted above. In the cluster mode, even writing back the file we
 just read fails
 (rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting)
 
  I still believe this is a classNotFound error in disguise
 

 Indeed you are right, this can be the reason. I had similar errors when
 defining case classes in the shell and trying to use them in the RDDs. Are
 you shading argonaut in the fat jar ?

  Thanks
  Shivani
 
 
 
  On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com
 wrote:
 
  Wait, so the file only has four lines and the job running out of heap
 space?  Can you share the code you're running that does the processing?
  I'd guess that you're doing some intense processing on every line but just
 writing parsed case classes back to disk sounds very lightweight.
 
  I
 
 
  On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com
 wrote:
 
  I am trying to process a file that contains 4 log lines (not very
 long) and then write my parsed out case classes to a destination folder,
 and I get the following error:
 
 
  java.lang.OutOfMemoryError: Java heap space
 
  at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
 
  at
 org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
 
  at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
 
  at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
 
  at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 
  at java.lang.reflect.Method.invoke(Method.java:597)
 
  at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
 
  at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
 
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 
  at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 
  at
 org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
 
  at
 org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
 
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java

Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Eugen Cepoi
In short, ADD_JARS will add the jar to your driver classpath and also send
it to the workers (similar to what you are doing when you do sc.addJars).

ex: MASTER=master/url ADD_JARS=/path/to/myJob.jar ./bin/spark-shell


You also have SPARK_CLASSPATH var but it does not distribute the code, it
is only used to compute the driver classpath.


BTW, you are not supposed to change the compute_classpath.script


2014-06-20 19:45 GMT+02:00 Shivani Rao raoshiv...@gmail.com:

 Hello Eugene,

 You are right about this. I did encounter the pergmgenspace in the spark
 shell. Can you tell me a little more about ADD_JARS. In order to ensure
 my spark_shell has all required jars, I added the jars to the $CLASSPATH
 in the compute_classpath.sh script. is there another way of doing it?

 Shivani


 On Fri, Jun 20, 2014 at 9:47 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 In my case it was due to a case class I was defining in the spark-shell
 and not being available on the workers. So packaging it in a jar and adding
 it with ADD_JARS solved the problem. Note that I don't exactly remember if
 it was an out of heap space exception or pergmen space. Make sure your
 jarsPath is correct.

 Usually to debug this kind of problems I am using the spark-shell (you
 can do the same in your job but its more time constuming to repackage,
 deploy, run, iterate). Try for example
 1) read the lines (without any processing) and count them
 2) apply processing and count



 2014-06-20 17:15 GMT+02:00 Shivani Rao raoshiv...@gmail.com:

 Hello Abhi, I did try that and it did not work

 And Eugene, Yes I am assembling the argonaut libraries in the fat jar.
 So how did you overcome this problem?

 Shivani


 On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:


 Le 20 juin 2014 01:46, Shivani Rao raoshiv...@gmail.com a écrit :

 
  Hello Andrew,
 
  i wish I could share the code, but for proprietary reasons I can't.
 But I can give some idea though of what i am trying to do. The job reads a
 file and for each line of that file and processors these lines. I am not
 doing anything intense in the processLogs function
 
  import argonaut._
  import argonaut.Argonaut._
 
 
  /* all of these case classes are created from json strings extracted
 from the line in the processLogs() function
  *
  */
  case class struct1…
  case class struct2…
  case class value1(struct1, struct2)
 
  def processLogs(line:String): Option[(key1, value1)] {…
  }
 
  def run(sparkMaster, appName, executorMemory, jarsPath) {
val sparkConf = new SparkConf()
 sparkConf.setMaster(sparkMaster)
 sparkConf.setAppName(appName)
 sparkConf.set(spark.executor.memory, executorMemory)
  sparkConf.setJars(jarsPath) // This includes all the jars
 relevant jars..
 val sc = new SparkContext(sparkConf)
val rawLogs =
 sc.textFile(hdfs://my-hadoop-namenode:8020:myfile.txt)
 
 rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting)
 
 rawLogs.flatMap(processLogs).saveAsTextFile(hdfs://my-hadoop-namenode:8020:outfile.txt)
  }
 
  If I switch to local mode, the code runs just fine, it fails with
 the error I pasted above. In the cluster mode, even writing back the file
 we just read fails
 (rawLogs.saveAsTextFile(hdfs://my-hadoop-namenode:8020:writebackForTesting)
 
  I still believe this is a classNotFound error in disguise
 

 Indeed you are right, this can be the reason. I had similar errors when
 defining case classes in the shell and trying to use them in the RDDs. Are
 you shading argonaut in the fat jar ?

  Thanks
  Shivani
 
 
 
  On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash and...@andrewash.com
 wrote:
 
  Wait, so the file only has four lines and the job running out of
 heap space?  Can you share the code you're running that does the
 processing?  I'd guess that you're doing some intense processing on every
 line but just writing parsed case classes back to disk sounds very
 lightweight.
 
  I
 
 
  On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com
 wrote:
 
  I am trying to process a file that contains 4 log lines (not very
 long) and then write my parsed out case classes to a destination folder,
 and I get the following error:
 
 
  java.lang.OutOfMemoryError: Java heap space
 
  at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
 
  at
 org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
 
  at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
 
  at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
 
  at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 
  at java.lang.reflect.Method.invoke(Method.java:597

Re: spark 1.0 not using properties file from SPARK_CONF_DIR

2014-06-06 Thread Eugen Cepoi
Here is the PR https://github.com/apache/spark/pull/997


2014-06-03 19:24 GMT+02:00 Patrick Wendell pwend...@gmail.com:

 You can set an arbitrary properties file by adding --properties-file
 argument to spark-submit. It would be nice to have spark-submit also
 look in SPARK_CONF_DIR as well by default. If you opened a JIRA for
 that I'm sure someone would pick it up.

 On Tue, Jun 3, 2014 at 7:47 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:
  Is it on purpose that when setting SPARK_CONF_DIR spark submit still
 loads
  the properties file from SPARK_HOME/conf/spark-defauls.conf ?
 
  IMO it would be more natural to override what is defined in
 SPARK_HOME/conf
  by SPARK_CONF_DIR when defined (and SPARK_CONF_DIR being overriden by
  command line args).
 
  Eugen



spark 1.0 not using properties file from SPARK_CONF_DIR

2014-06-03 Thread Eugen Cepoi
Is it on purpose that when setting SPARK_CONF_DIR spark submit still loads
the properties file from SPARK_HOME/conf/spark-defauls.conf ?

IMO it would be more natural to override what is defined in SPARK_HOME/conf
by SPARK_CONF_DIR when defined (and SPARK_CONF_DIR being overriden by
command line args).

Eugen


Re: Packaging a spark job using maven

2014-05-19 Thread Eugen Cepoi
2014-05-19 10:35 GMT+02:00 Laurent T laurent.thou...@ldmobile.net:

 Hi Eugen,

 Thanks for your help. I'm not familiar with the shaded plugin and i was
 wondering: does it replace the assembly plugin ?


Nope it doesn't replace it. It allows you to make fat jars and other nice
things such as relocating classes to some other package.

I am using it in combination with assembly and jdeb to build deployable
archives (zip and debian). I find that building fat jars with shade plugin
is more powerful and easier that with assembly.


 Also, do i have to specify
 all the artifacts and sub artifacts in the artifactSet ? Or can i just use
 a
 *:* wildcard and let the maven scopes do their work ? I have a lot of
 overlap warnings when i do so.


Indeed you don't have to tell exactly what must be included, I do so, in
order to have at the end a small archive that we can quickly deploy. Have a
look at the doc you have some examples
http://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html

In short, you remove the includes and instead write the excludes (spark,
hadoop, etc). The overlap is due to same classes being present in different
jars. You can exclude those jars to remove the warnings.

http://stackoverflow.com/questions/19987080/maven-shade-plugin-uber-jar-and-overlapping-classes
http://stackoverflow.com/questions/11824633/maven-shade-plugin-warning-we-have-a-duplicate-how-to-fix

Eugen




 Thanks for your help.
 Regards,
 Laurent



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-spark-job-using-maven-tp5615p6024.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Packaging a spark job using maven

2014-05-16 Thread Eugen Cepoi
Laurent the problem is that the reference.conf that is embedded in akka
jars is being overriden by some other conf. This happens when multiple
files have the same name.
I am using Spark with maven. In order to build the fat jar I use the shade
plugin and it works pretty well. The trick here is to use an
AppendingTransformer that will merge all the resource.conf into a single
one.

Try something like that:

plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-shade-plugin/artifactId
version2.1/version
executions
execution
phasepackage/phase
goals
goalshade/goal
/goals
configuration
minimizeJarfalse/minimizeJar

createDependencyReducedPomfalse/createDependencyReducedPom
artifactSet
includes
!-- Include here the dependencies you
want to be packed in your fat jar --
includemy.package.etc:*/include
/includes
/artifactSet
filters
filter
artifact*:*/artifact
excludes
excludeMETA-INF/*.SF/exclude
excludeMETA-INF/*.DSA/exclude
excludeMETA-INF/*.RSA/exclude
/excludes
/filter
/filters
transformers
transformer
implementation=org.apache.maven.plugins.shade.resource.AppendingTransformer
resourcereference.conf/resource
/transformer
/transformers
/configuration
/execution
/executions
/plugin


2014-05-14 15:37 GMT+02:00 Laurent T laurent.thou...@ldmobile.net:

 Hi,

 Thanks François but this didn't change much. I'm not even sure what this
 reference.conf is. It isn't mentioned in any of spark documentation. Should
 i have one in my resources ?

 Thanks
 Laurent



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-spark-job-using-maven-tp5615p5707.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



spark 0.9.1 textFile hdfs unknown host exception

2014-05-16 Thread Eugen Cepoi
Hi,

I have some strange behaviour when using textFile to read some data from
HDFS in spark 0.9.1.
I get UnknownHost exceptions,  where hadoop client tries to resolve the
dfs.nameservices and fails.

So far:
 - this has been tested inside the shell
 - the exact same code works with spark-0.8.1
 - the shell is launched with HADOOP_CONF_DIR pointing to our HA conf
 - if before that some other rdd is created from HDFS and succeeds than,
this works also (might be related in the way the default hadoop
configuration is being shared?)
 - if using the new MR API it works
   sc.newAPIHadoopFile(path, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text],
sc.hadoopConfiguration).map(_._2.toString)

Hadoop disitribution: 2.0.0-cdh4.1.2
Spark 0.9.1 - packaged with correct version of hadoop

Eugen


Re: spark 0.9.1 textFile hdfs unknown host exception

2014-05-15 Thread Eugen Cepoi
Solved: Putting HADOOP_CONF_DIR in spark-env of the workers solved the
problem.


The difference between HadoopRDD and NewHadoopRDD is that the old one
creates the JobConf on worker side, where the new one creates an instance
of JobConf on driver side and then broadcasts it.

I tried creating myself the HadoopRDD and tweaked a bit things in order to
log the properties in the conf when loaded on worker side and on driver
side. On worker side I see a dummy conf that looks like the default conf to
me, where on driver side I get the right conf with the namenodes etc.

My guess is that HADOOP_CONF_DIR is not shared with the workers when set
only on the driver (it was not defined in spark-env)?

Also wouldn't it be more natural to create the conf on driver side and then
share it with the workers?





2014-05-09 10:51 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hi,

 I have some strange behaviour when using textFile to read some data from
 HDFS in spark 0.9.1.
 I get UnknownHost exceptions,  where hadoop client tries to resolve the
 dfs.nameservices and fails.

 So far:
  - this has been tested inside the shell
  - the exact same code works with spark-0.8.1
  - the shell is launched with HADOOP_CONF_DIR pointing to our HA conf
  - if before that some other rdd is created from HDFS and succeeds than,
 this works also (might be related in the way the default hadoop
 configuration is being shared?)
  - if using the new MR API it works
sc.newAPIHadoopFile(path, classOf[TextInputFormat],
 classOf[LongWritable], classOf[Text],
 sc.hadoopConfiguration).map(_._2.toString)

 Hadoop disitribution: 2.0.0-cdh4.1.2
 Spark 0.9.1 - packaged with correct version of hadoop

 Eugen



Re: Pig on Spark

2014-04-25 Thread Eugen Cepoi
It depends, personally I have the opposite opinion.

IMO expressing pipelines in a functional language feels natural, you just
have to get used with the language (scala).

Testing spark jobs is easy where testing a Pig script is much harder and
not natural.

If you want a more high level language that deals with RDDs for you, you
can use spark sql
http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html

Of course you can express less things this way, but if you have some
complex logic I think it would make sense to write a classic spark job that
would be more robust in the long term.


2014-04-25 15:30 GMT+02:00 Mark Baker dist...@acm.org:

 I've only had a quick look at Pig, but it seems that a declarative
 layer on top of Spark couldn't be anything other than a big win, as it
 allows developers to declare *what* they want, permitting the compiler
 to determine how best poke at the RDD API to implement it.

 In my brief time with Spark, I've often thought that it feels very
 unnatural to use imperative code to declare a pipeline.



Re: what is the best way to do cartesian

2014-04-25 Thread Eugen Cepoi
Depending on the size of the rdd you could also do a collect broadcast and
then compute the product in a map function over the other rdd. If this is
the same rdd you might also want to cache it. This pattern worked quite
good for me
Le 25 avr. 2014 18:33, Alex Boisvert alex.boisv...@gmail.com a écrit :

 You might want to try the built-in RDD.cartesian() method.


 On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote:

 Hi All,

 I have a problem with the Item-Based Collaborative Filtering
 Recommendation
 Algorithms in spark.
 The basic flow is as below:
 (Item1,  (User1 ,
 Score1))
RDD1 ==(Item2,  (User2 ,
 Score2))
 (Item1,  (User2 ,
 Score3))
 (Item2,  (User1 ,
 Score4))

RDD1.groupByKey   ==  RDD2
 (Item1,  ((User1,
 Score1),
 (User2,   Score3)))
 (Item2,  ((User1,
 Score4),
 (User2,   Score2)))

 The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
 ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
 Item2.

 In my situation, RDD2 contains 20 million records, my spark programm is
 extreamly slow, the source code is as below:
 val conf = new
 SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score
 Calcu Total).set(spark.executor.memory,
 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar))
 val sc = new SparkContext(conf)

 val mongoRDD =
 sc.textFile(args(0).toString,
 400)
 val jsonRDD = mongoRDD.map(arg = new
 JSONObject(arg))

 val newRDD = jsonRDD.map(arg = {
 var score =
 haha(arg.get(a).asInstanceOf[JSONObject])

 // set score to 0.5 for testing
 arg.put(score, 0.5)
 arg
 })

 val resourceScoresRDD = newRDD.map(arg =
 (arg.get(rid).toString.toLong, (arg.get(zid).toString,
 arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache()
 val resourceScores =
 resourceScoresRDD.collect()
 val bcResourceScores =
 sc.broadcast(resourceScores)

 val simRDD =
 resourceScoresRDD.mapPartitions({iter =
 val m = bcResourceScores.value
 for{ (r1, v1) - iter
(r2, v2) - m
if r1  r2
 } yield (r1, r2, cosSimilarity(v1,
 v2))}, true).filter(arg = arg._3  0.1)

 println(simRDD.count)

 And I saw this in Spark Web UI:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
 

 My standalone cluster has 3 worker node (16 core and 32G RAM),and the
 workload of the machine in my cluster is heavy when the spark program is
 running.

 Is there any better way to do the algorithm?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: RDD collect help

2014-04-18 Thread Eugen Cepoi
Because it happens to reference something outside the closures scope that
will reference some other objects (that you don't need) and so one,
resulting in serializing with your task a lot of things that you don't
want. But sure it is discutable and it's more my personal opinion.


2014-04-17 23:28 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Thanks again Eugen! I don't get the point..why you prefer to avoid kyro
 ser for closures?is there any problem with that?
 On Apr 17, 2014 11:10 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 You have two kind of ser : data and closures. They both use java ser.
 This means that in your function you reference an object outside of it and
 it is getting ser with your task. To enable kryo ser for closures set
 spark.closure.serializer property. But usualy I dont as it allows me to
 detect such unwanted references.
 Le 17 avr. 2014 22:17, Flavio Pompermaier pomperma...@okkam.it a
 écrit :

 Now I have another problem..I have to pass one o this non serializable
 object to a PairFunction and I received another non serializable
 exception..it seems that Kyro doesn't work within Functions. Am I wrong or
 this is a limit of Spark?
 On Apr 15, 2014 1:36 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 Ok thanks for the help!

 Best,
 Flavio


 On Tue, Apr 15, 2014 at 12:43 AM, Eugen Cepoi cepoi.eu...@gmail.comwrote:

 Nope, those operations are lazy, meaning it will create the RDDs but
 won't trigger any action. The computation is launched by operations such
 as collect, count, save to HDFS etc. And even if they were not lazy, no
 serialization would happen. Serialization occurs only when data will be
 transfered (collect, shuffle, maybe perist to disk - but I am not sure for
 this one).


 2014-04-15 0:34 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Ok, that's fair enough. But why things work up to the collect?during
 map and filter objects are not serialized?
  On Apr 15, 2014 12:31 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Sure. As you have pointed, those classes don't implement
 Serializable and Spark uses by default java serialization (when you do
 collect the data from the workers will be serialized, collected by the
 driver and then deserialized on the driver side). Kryo (as most other
 decent serialization libs) doesn't require you to implement 
 Serializable.

 For the missing attributes it's due to the fact that java
 serialization does not ser/deser attributes from classes that don't 
 impl.
 Serializable (in your case the parent classes).


 2014-04-14 23:17 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it
 :

 Thanks Eugen for tgee reply. Could you explain me why I have the
 problem?Why my serialization doesn't work?
 On Apr 14, 2014 6:40 PM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 as a easy workaround you can enable Kryo serialization
 http://spark.apache.org/docs/latest/configuration.html

 Eugen


 2014-04-14 18:21 GMT+02:00 Flavio Pompermaier 
 pomperma...@okkam.it:

 Hi to all,

 in my application I read objects that are not serializable
 because I cannot modify the sources.
 So I tried to do a workaround creating a dummy class that extends
 the unmodifiable one but implements serializable.
 All attributes of the parent class are Lists of objects (some of
 them are still not serializable and some of them are, i.e. 
 ListString).

 Until I do map and filter on the RDD that objects are filled
 correclty (I checked that via Eclipse debug), but when I do collect 
 all the
 attributes of my objects are empty. Could you help me please?
 I'm using spark-core-2.10 e version 0.9.0-incubating.

 Best,
 Flavio







Re: RDD collect help

2014-04-18 Thread Eugen Cepoi
Indeed, serialization is always tricky when you want to work on objects
that are more sophisticated than simple POJOs.
And you can have sometimes unexpected behaviour when using the deserialized
objects. In my case I had troubles when serializaing/deser Avro specific
records with lists. The implementation of java.util.List used by avro does
not have a default no arg constructor and has initialization logic inside
its constructors.


The best way to go (IMO) when you need some:
 - var is to do a copy of it inside the function having the closure
 - function to use in your closure = define it in some stateless dummy
class and implement serializable
 - also a trick with vars could be to define them as lazy, thus they will
be created inside the closure, so the closure won't have a reference on the
outter class (but you might get other surprises...)


2014-04-18 10:37 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Ok thanks. However it turns out that there's a problem with that and it's
 not so safe to use kryo serialization with Spark:

 Exception in thread Executor task launch worker-0
 java.lang.NullPointerException
  at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$6.apply(Executor.scala:267)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$6.apply(Executor.scala:267)

 This error is reported also at
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCAPud8Tq7fK5j2Up9dDdRQ=y1efwidjnmqc55o9jm5dh7rpd...@mail.gmail.com%3E
 .


 On Fri, Apr 18, 2014 at 10:31 AM, Eugen Cepoi cepoi.eu...@gmail.comwrote:

 Because it happens to reference something outside the closures scope that
 will reference some other objects (that you don't need) and so one,
 resulting in serializing with your task a lot of things that you don't
 want. But sure it is discutable and it's more my personal opinion.


 2014-04-17 23:28 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Thanks again Eugen! I don't get the point..why you prefer to avoid kyro
 ser for closures?is there any problem with that?
  On Apr 17, 2014 11:10 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 You have two kind of ser : data and closures. They both use java ser.
 This means that in your function you reference an object outside of it and
 it is getting ser with your task. To enable kryo ser for closures set
 spark.closure.serializer property. But usualy I dont as it allows me to
 detect such unwanted references.
 Le 17 avr. 2014 22:17, Flavio Pompermaier pomperma...@okkam.it a
 écrit :

 Now I have another problem..I have to pass one o this non serializable
 object to a PairFunction and I received another non serializable
 exception..it seems that Kyro doesn't work within Functions. Am I wrong or
 this is a limit of Spark?
 On Apr 15, 2014 1:36 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 Ok thanks for the help!

 Best,
 Flavio


 On Tue, Apr 15, 2014 at 12:43 AM, Eugen Cepoi 
 cepoi.eu...@gmail.comwrote:

 Nope, those operations are lazy, meaning it will create the RDDs but
 won't trigger any action. The computation is launched by operations 
 such
 as collect, count, save to HDFS etc. And even if they were not lazy, no
 serialization would happen. Serialization occurs only when data will be
 transfered (collect, shuffle, maybe perist to disk - but I am not sure 
 for
 this one).


 2014-04-15 0:34 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Ok, that's fair enough. But why things work up to the collect?during
 map and filter objects are not serialized?
  On Apr 15, 2014 12:31 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Sure. As you have pointed, those classes don't implement
 Serializable and Spark uses by default java serialization (when you do
 collect the data from the workers will be serialized, collected by 
 the
 driver and then deserialized on the driver side). Kryo (as most other
 decent serialization libs) doesn't require you to implement 
 Serializable.

 For the missing attributes it's due to the fact that java
 serialization does not ser/deser attributes from classes that don't 
 impl.
 Serializable (in your case the parent classes).


 2014-04-14 23:17 GMT+02:00 Flavio Pompermaier 
 pomperma...@okkam.it:

 Thanks Eugen for tgee reply. Could you explain me why I have the
 problem?Why my serialization doesn't work?
 On Apr 14, 2014 6:40 PM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 as a easy workaround you can enable Kryo serialization
 http://spark.apache.org/docs/latest/configuration.html

 Eugen


 2014-04-14 18:21 GMT+02:00 Flavio Pompermaier 
 pomperma...@okkam.it:

 Hi to all,

 in my application I read objects that are not serializable
 because I cannot modify the sources.
 So I tried to do a workaround creating a dummy class that
 extends the unmodifiable one but implements serializable.
 All attributes of the parent class are Lists of objects (some
 of them are still not serializable and some of them are, i.e. 
 ListString).

 Until I do map

Re: RDD collect help

2014-04-17 Thread Eugen Cepoi
You have two kind of ser : data and closures. They both use java ser. This
means that in your function you reference an object outside of it and it is
getting ser with your task. To enable kryo ser for closures set
spark.closure.serializer property. But usualy I dont as it allows me to
detect such unwanted references.
Le 17 avr. 2014 22:17, Flavio Pompermaier pomperma...@okkam.it a écrit :

 Now I have another problem..I have to pass one o this non serializable
 object to a PairFunction and I received another non serializable
 exception..it seems that Kyro doesn't work within Functions. Am I wrong or
 this is a limit of Spark?
 On Apr 15, 2014 1:36 PM, Flavio Pompermaier pomperma...@okkam.it
 wrote:

 Ok thanks for the help!

 Best,
 Flavio


 On Tue, Apr 15, 2014 at 12:43 AM, Eugen Cepoi cepoi.eu...@gmail.comwrote:

 Nope, those operations are lazy, meaning it will create the RDDs but
 won't trigger any action. The computation is launched by operations such
 as collect, count, save to HDFS etc. And even if they were not lazy, no
 serialization would happen. Serialization occurs only when data will be
 transfered (collect, shuffle, maybe perist to disk - but I am not sure for
 this one).


 2014-04-15 0:34 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Ok, that's fair enough. But why things work up to the collect?during map
 and filter objects are not serialized?
  On Apr 15, 2014 12:31 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Sure. As you have pointed, those classes don't implement Serializable
 and Spark uses by default java serialization (when you do collect the data
 from the workers will be serialized, collected by the driver and then
 deserialized on the driver side). Kryo (as most other decent serialization
 libs) doesn't require you to implement Serializable.

 For the missing attributes it's due to the fact that java
 serialization does not ser/deser attributes from classes that don't impl.
 Serializable (in your case the parent classes).


 2014-04-14 23:17 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Thanks Eugen for tgee reply. Could you explain me why I have the
 problem?Why my serialization doesn't work?
 On Apr 14, 2014 6:40 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi,

 as a easy workaround you can enable Kryo serialization
 http://spark.apache.org/docs/latest/configuration.html

 Eugen


 2014-04-14 18:21 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it
 :

 Hi to all,

 in my application I read objects that are not serializable because
 I cannot modify the sources.
 So I tried to do a workaround creating a dummy class that extends
 the unmodifiable one but implements serializable.
 All attributes of the parent class are Lists of objects (some of
 them are still not serializable and some of them are, i.e. 
 ListString).

 Until I do map and filter on the RDD that objects are filled
 correclty (I checked that via Eclipse debug), but when I do collect 
 all the
 attributes of my objects are empty. Could you help me please?
 I'm using spark-core-2.10 e version 0.9.0-incubating.

 Best,
 Flavio







Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-16 Thread Eugen Cepoi
Yes, the second example does that. It transforms all the points of a
partition into a single element the skyline, thus reduce will run on the
skyline of two partitions and not on single points.
Le 16 avr. 2014 06:47, Yanzhe Chen yanzhe...@gmail.com a écrit :

 Eugen,

 Thanks for your tip and I do want to merge the result of a partition with
 another one but I am still not quite clear how to do it.

 Say the original data rdd has 32 partitions and since mapPartitions won’t
 change the number of partitions, it will remain 32 partitions which each
 contains the partial skyline of points in its partition. Now I want to
 merge those 32 partitions to generate a new skyline. It will be better if I
 can use reduce to merge each two of them (than just collect them in to
 one), but I think simply calling reduce method on the rdd won’t work
 because it reduce the data at the granularity of point rather than the
 partition results (which is the collection of points). So is there a way to
 reduce the data at the granularity of partitions?

 Thanks,

 Yanzhe

 On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote:

 It depends on your algorithm but I guess that you probably should use
 reduce (the code probably doesn't compile but it shows you the idea).

 val result = data.reduce { case (left, right) =
   skyline(left ++ right)
 }

 Or in the case you want to merge the result of a partition with another
 one you could do:

 val result = data.mapPartitions { points =

 // transforms all the partition into a single element,
 but this may incur some other problems, especially if you use Kryo
 serialization...
 *Seq(skyline*(points.toArray))
  }.reduce { case (left, right) =

 skyline(left ++ right)
  }




 2014-04-15 19:37 GMT+02:00 Cheng Lian lian.cs@gmail.com:

 Your Spark solution first reduces partial results into a single partition,
 computes the final result, and then collects to the driver side. This
 involves a shuffle and two waves of network traffic. Instead, you can
 directly collect partial results to the driver and then computes the final
 results on driver side:

 val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble))val 
 partialResults = data.mapPartitions(points = 
 skyline(points.toArray).iterator).collect()val results = 
 skyline(partialResults)

 On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com wrote:

  Hi all,

 As a previous thread, I am asking how to implement a divide-and-conquer
 algorithm (skyline) in Spark.
 Here is my current solution:

 val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble))

 val result = data.mapPartitions(points = 
 *skyline*(points.toArray).iterator).coalesce(1,
 true)
  .mapPartitions(points = *skyline*
 (points.toArray).iterator).collect()

 where skyline is a local algorithm to compute the results:

 def *skyline*(points: Array[Point]) : Array[Point]

 Basically, I find this implement runs slower than the corresponding Hadoop
 version (the identity map phase plus local skyline for both combine and
 reduce phases).

 Below are my questions:

 1. Why this implementation is much slower than the Hadoop one?

 I can find two possible reasons: one is the shuffle overhead in coalesce,
 another is calling the toArray and iterator repeatedly when invoking
 local skyline algorithm. But I am not sure which one is true.

 I haven’t seen your Hadoop version. But if this assumption is right, the
 above version should help.


 2. One observation is that while Hadoop version almost used up all the CPU
 resources during execution, the CPU seems not that hot on Spark. Is that a
 clue to prove that the shuffling might be the real bottleneck?

 How many parallel tasks are there when running your Spark code? I doubt
 tasks are queued and run sequentially.


 3. Is there any difference between coalesce(1, true) and reparation? It
 seems that both opeartions need shuffling data. What’s the proper
 situations using the coalesce method?

 repartition(n) is just an alias of coalesce(n, true), so yes, they both
 involve data shuffling. coalesce can be used to shrink partition number
 when dataset size shrinks dramatically after operations like filter. Say
 you have an RDD containing 1TB of data with 100 partitions, after a
 .filter(...) call, only 20GB data left, then you may want to coalesce to
 2 partitions rather than 100.


 4. More generally, I am trying to implementing some core geometry
 computation operators on Spark (like skyline, convex hull etc). In my
 understanding, since Spark is more capable of handling iterative
 computations on dataset, the above solution apparently doesn’t exploit what
 Spark is good at. Any comments on how to do geometry computations on Spark
 (if it is possible) ?

 Although Spark is good at iterative algorithms, it also performs better in
 batch computing due to much lower scheduling overhead

Re: Why these operations are slower than the equivalent on Hadoop?

2014-04-15 Thread Eugen Cepoi
It depends on your algorithm but I guess that you probably should use
reduce (the code probably doesn't compile but it shows you the idea).

val result = data.reduce { case (left, right) =
  skyline(left ++ right)
}

Or in the case you want to merge the result of a partition with another one
you could do:

val result = data.mapPartitions { points =

// transforms all the partition into a single element,
but this may incur some other problems, especially if you use Kryo
serialization...
*Seq(skyline*(points.toArray))
 }.reduce { case (left, right) =

skyline(left ++ right)
 }




2014-04-15 19:37 GMT+02:00 Cheng Lian lian.cs@gmail.com:

 Your Spark solution first reduces partial results into a single partition,
 computes the final result, and then collects to the driver side. This
 involves a shuffle and two waves of network traffic. Instead, you can
 directly collect partial results to the driver and then computes the final
 results on driver side:

 val data = sc.textFile(...).map(line = line.split(,).map(_.toDouble))val 
 partialResults = data.mapPartitions(points = 
 skyline(points.toArray).iterator).collect()val results = 
 skyline(partialResults)

 On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen yanzhe...@gmail.com wrote:

  Hi all,

 As a previous thread, I am asking how to implement a divide-and-conquer
 algorithm (skyline) in Spark.
 Here is my current solution:

 val data = sc.textFile(…).map(line = line.split(“,”).map(_.toDouble))

 val result = data.mapPartitions(points = 
 *skyline*(points.toArray).iterator).coalesce(1,
 true)
  .mapPartitions(points = *skyline*
 (points.toArray).iterator).collect()

 where skyline is a local algorithm to compute the results:

 def *skyline*(points: Array[Point]) : Array[Point]

 Basically, I find this implement runs slower than the corresponding
 Hadoop version (the identity map phase plus local skyline for both combine
 and reduce phases).

 Below are my questions:

 1. Why this implementation is much slower than the Hadoop one?

 I can find two possible reasons: one is the shuffle overhead in coalesce,
 another is calling the toArray and iterator repeatedly when invoking
 local skyline algorithm. But I am not sure which one is true.

 I haven’t seen your Hadoop version. But if this assumption is right, the
 above version should help.


 2. One observation is that while Hadoop version almost used up all the
 CPU resources during execution, the CPU seems not that hot on Spark. Is
 that a clue to prove that the shuffling might be the real bottleneck?

 How many parallel tasks are there when running your Spark code? I doubt
 tasks are queued and run sequentially.


 3. Is there any difference between coalesce(1, true) and reparation? It
 seems that both opeartions need shuffling data. What’s the proper
 situations using the coalesce method?

 repartition(n) is just an alias of coalesce(n, true), so yes, they both
 involve data shuffling. coalesce can be used to shrink partition number
 when dataset size shrinks dramatically after operations like filter. Say
 you have an RDD containing 1TB of data with 100 partitions, after a
 .filter(...) call, only 20GB data left, then you may want to coalesce to
 2 partitions rather than 100.


 4. More generally, I am trying to implementing some core geometry
 computation operators on Spark (like skyline, convex hull etc). In my
 understanding, since Spark is more capable of handling iterative
 computations on dataset, the above solution apparently doesn’t exploit what
 Spark is good at. Any comments on how to do geometry computations on Spark
 (if it is possible) ?

 Although Spark is good at iterative algorithms, it also performs better in
 batch computing due to much lower scheduling overhead and thread level
 parallelism. Theoretically, you can always accelerate your MapReduce job by
 rewriting it in Spark.


 Thanks for any insight.

 Yanzhe




Re: RDD collect help

2014-04-14 Thread Eugen Cepoi
Sure. As you have pointed, those classes don't implement Serializable and
Spark uses by default java serialization (when you do collect the data from
the workers will be serialized, collected by the driver and then
deserialized on the driver side). Kryo (as most other decent serialization
libs) doesn't require you to implement Serializable.

For the missing attributes it's due to the fact that java serialization
does not ser/deser attributes from classes that don't impl. Serializable
(in your case the parent classes).


2014-04-14 23:17 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Thanks Eugen for tgee reply. Could you explain me why I have the
 problem?Why my serialization doesn't work?
 On Apr 14, 2014 6:40 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi,

 as a easy workaround you can enable Kryo serialization
 http://spark.apache.org/docs/latest/configuration.html

 Eugen


 2014-04-14 18:21 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Hi to all,

 in my application I read objects that are not serializable because I
 cannot modify the sources.
 So I tried to do a workaround creating a dummy class that extends the
 unmodifiable one but implements serializable.
 All attributes of the parent class are Lists of objects (some of them
 are still not serializable and some of them are, i.e. ListString).

 Until I do map and filter on the RDD that objects are filled correclty
 (I checked that via Eclipse debug), but when I do collect all the
 attributes of my objects are empty. Could you help me please?
 I'm using spark-core-2.10 e version 0.9.0-incubating.

 Best,
 Flavio





Re: RDD collect help

2014-04-14 Thread Eugen Cepoi
Nope, those operations are lazy, meaning it will create the RDDs but won't
trigger any action. The computation is launched by operations such as
collect, count, save to HDFS etc. And even if they were not lazy, no
serialization would happen. Serialization occurs only when data will be
transfered (collect, shuffle, maybe perist to disk - but I am not sure for
this one).


2014-04-15 0:34 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Ok, that's fair enough. But why things work up to the collect?during map
 and filter objects are not serialized?
 On Apr 15, 2014 12:31 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Sure. As you have pointed, those classes don't implement Serializable and
 Spark uses by default java serialization (when you do collect the data from
 the workers will be serialized, collected by the driver and then
 deserialized on the driver side). Kryo (as most other decent serialization
 libs) doesn't require you to implement Serializable.

 For the missing attributes it's due to the fact that java serialization
 does not ser/deser attributes from classes that don't impl. Serializable
 (in your case the parent classes).


 2014-04-14 23:17 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Thanks Eugen for tgee reply. Could you explain me why I have the
 problem?Why my serialization doesn't work?
 On Apr 14, 2014 6:40 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi,

 as a easy workaround you can enable Kryo serialization
 http://spark.apache.org/docs/latest/configuration.html

 Eugen


 2014-04-14 18:21 GMT+02:00 Flavio Pompermaier pomperma...@okkam.it:

 Hi to all,

 in my application I read objects that are not serializable because I
 cannot modify the sources.
 So I tried to do a workaround creating a dummy class that extends the
 unmodifiable one but implements serializable.
 All attributes of the parent class are Lists of objects (some of them
 are still not serializable and some of them are, i.e. ListString).

 Until I do map and filter on the RDD that objects are filled correclty
 (I checked that via Eclipse debug), but when I do collect all the
 attributes of my objects are empty. Could you help me please?
 I'm using spark-core-2.10 e version 0.9.0-incubating.

 Best,
 Flavio