Save org.apache.spark.mllib.linalg.Matri to a file

2015-04-15 Thread Spico Florin
Hello!

The result of correlation in Spark MLLib is a of type
org.apache.spark.mllib.linalg.Matrix. (see
http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations)

val data: RDD[Vector] = ...

val correlMatrix: Matrix = Statistics.corr(data, pearson)

I would like to save the result into a file. How can I do this?

 Thanks,

 Florin


Execption while using kryo with broadcast

2015-04-15 Thread Jeetendra Gangele
Hi All I am getting below exception while using Kyro serializable with
broadcast variable. I am broadcating a hasmap with below line.

MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
jsc.broadcast(matchData);






15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage
4.0 (TID 7)
java.io.IOException: java.lang.UnsupportedOperationException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at java.util.AbstractMap.put(AbstractMap.java:203)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
... 18 more
15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
commanded a shutdown
15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.


Re: Execption while using kryo with broadcast

2015-04-15 Thread Jeetendra Gangele
Yes Without Kryo it did work out.when I remove kryo registration it did
worked out

On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Hi All I am getting below exception while using Kyro serializable with
 broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Shutting down remote daemon.











Re: Execption while using kryo with broadcast

2015-04-15 Thread Akhil Das
Is it working without kryo?

Thanks
Best Regards

On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Hi All I am getting below exception while using Kyro serializable with
 broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage
 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Shutting down remote daemon.





Re: Execption while using kryo with broadcast

2015-04-15 Thread Jeetendra Gangele
This looks like known issue? check this out
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html

Can you please suggest any work around I am broad casting HashMap return
from RDD.collectasMap().

On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote:

 this is a really strange exception ... I'm especially surprised that it
 doesn't work w/ java serialization.  Do you think you could try to boil it
 down to a minimal example?

 On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Yes Without Kryo it did work out.when I remove kryo registration it did
 worked out

 On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com
 wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi All I am getting below exception while using Kyro serializable with
 broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO
 remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
 daemon.
















Re: Execption while using kryo with broadcast

2015-04-15 Thread Imran Rashid
this is a really strange exception ... I'm especially surprised that it
doesn't work w/ java serialization.  Do you think you could try to boil it
down to a minimal example?

On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Yes Without Kryo it did work out.when I remove kryo registration it did
 worked out

 On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Hi All I am getting below exception while using Kyro serializable with
 broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO
 remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
 daemon.















Re: Execption while using kryo with broadcast

2015-04-15 Thread Imran Rashid
oh interesting.  The suggested workaround is to wrap the result from
collectAsMap into another hashmap, you should try that:

MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
MapString, String tmp = new HashMapString, String(matchData);
final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
jsc.broadcast(tmp);

Can you please clarify:
* Does it work w/ java serialization in the end?  Or is this kryo only?
* which Spark version you are using? (one of the relevant bugs was fixed in
1.2.1 and 1.3.0)



On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 This looks like known issue? check this out

 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html

 Can you please suggest any work around I am broad casting HashMap return
 from RDD.collectasMap().

 On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote:

 this is a really strange exception ... I'm especially surprised that it
 doesn't work w/ java serialization.  Do you think you could try to boil it
 down to a minimal example?

 On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Yes Without Kryo it did work out.when I remove kryo registration it did
 worked out

 On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com
 wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi All I am getting below exception while using Kyro serializable
 with broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO
 remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
 daemon.




















Re: Microsoft SQL jdbc support from spark sql

2015-04-15 Thread ARose
I have found that it works if you place the sqljdbc41.jar directly in the
following folder:

YOUR_SPARK_HOME/core/target/jars/

So Spark will have the SQL Server jdbc driver when it computes its
classpath.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22502.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Execption while using kryo with broadcast

2015-04-15 Thread Jeetendra Gangele
its not working with the combination of Broadcast.
Without Kyro also not working.

On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Hi All I am getting below exception while using Kyro serializable with
 broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage
 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Shutting down remote daemon.







How to get a clean DataFrame schema merge

2015-04-15 Thread Jaonary Rabarisoa
Hi all,

If you follow the example of schema merging in the spark documentation
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
you obtain the following results when you want to load the result data :

single triple double
1  3  null
2  6  null
4  12 null
3  9  null
5  15 null
1  null   2
2  null   4
4  null   8
3  null   6
5  null   10

How to remove these null value and get something more logical like :

single triple double
1  3  2
2  6  4
4  12 8
3  9  6
5  15 10

Bests,

Jao


RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-15 Thread Nathan McCarthy
Tried with 1.3.0 release (built myself)  the most recent 1.3.1 Snapshot off 
the 1.3 branch.

Haven't tried with 1.4/master.


From: Wang, Daoyuan [daoyuan.w...@intel.com]
Sent: Wednesday, April 15, 2015 5:22 PM
To: Nathan McCarthy; user@spark.apache.org
Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Can you provide your spark version?

Thanks,
Daoyuan

From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au]
Sent: Wednesday, April 15, 2015 1:57 PM
To: Nathan McCarthy; user@spark.apache.org
Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au
Date: Wednesday, 15 April 2015 1:57 pm
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala sqlContext.load(jdbc, Map(url - 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - 
CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” - 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala sqlContext.load(jdbc, Map(url - 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - 
net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)

Yet if I run a Class.forName() just from the shell;

scala Class.forName(net.sourceforge.jtds.jdbc.Driver)
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala import java.sql.DriverManager
import java.sql.DriverManager
scala 
DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)
res3: java.sql.Connection = 
net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ = 
java.sql.DriverManager.getDrivers().asScala.map(d = s”$d | 
${d.acceptsURL(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)})).collect().foreach(println)

This successfully returns;

15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at 
Main.scala:46, took 1.495597 s
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true

As a final test we tried with postgres driver 

Re: Execption while using kryo with broadcast

2015-04-15 Thread Jeetendra Gangele
This worked with java serialization.I am using 1.2.0 you are right if I use
1.2.1 or 1.3.0 this issue will not occur
I will test this and let you know

On 15 April 2015 at 19:48, Imran Rashid iras...@cloudera.com wrote:

 oh interesting.  The suggested workaround is to wrap the result from
 collectAsMap into another hashmap, you should try that:

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 MapString, String tmp = new HashMapString, String(matchData);
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(tmp);

 Can you please clarify:
 * Does it work w/ java serialization in the end?  Or is this kryo only?
 * which Spark version you are using? (one of the relevant bugs was fixed
 in 1.2.1 and 1.3.0)



 On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 This looks like known issue? check this out

 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html

 Can you please suggest any work around I am broad casting HashMap return
 from RDD.collectasMap().

 On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote:

 this is a really strange exception ... I'm especially surprised that it
 doesn't work w/ java serialization.  Do you think you could try to boil it
 down to a minimal example?

 On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Yes Without Kryo it did work out.when I remove kryo registration it did
 worked out

 On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com
 wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi All I am getting below exception while using Kyro serializable
 with broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO 

Re: How to get a clean DataFrame schema merge

2015-04-15 Thread Michael Armbrust
Schema merging is not the feature you are looking for.  It is designed when
you are adding new records (that are not associated with old records),
which may or may not have new or missing columns.

In your case it looks like you have two datasets that you want to load
separately and join on a key.

On Wed, Apr 15, 2015 at 5:59 AM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Hi all,

 If you follow the example of schema merging in the spark documentation
 http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
 you obtain the following results when you want to load the result data :

 single triple double
 1  3  null
 2  6  null
 4  12 null
 3  9  null
 5  15 null
 1  null   2
 2  null   4
 4  null   8
 3  null   6
 5  null   10

 How to remove these null value and get something more logical like :

 single triple double
 1  3  2
 2  6  4
 4  12 8
 3  9  6
 5  15 10

 Bests,

 Jao



Re: Running beyond physical memory limits

2015-04-15 Thread Sandy Ryza
The setting to increase is spark.yarn.executor.memoryOverhead

On Wed, Apr 15, 2015 at 6:35 AM, Brahma Reddy Battula 
brahmareddy.batt...@huawei.com wrote:

 Hello Sean Owen,

 Thanks for your reply..Ill increase overhead memory and check it..


 Bytheway ,Any difference between 1.1 and 1.2 makes, this issue..? Since It
 was passing spark 1.1 and throwing following error in 1.2...( this makes me
 doubt full)



 Thanks  Regards
 Brahma Reddy Battula




 
 From: Sean Owen [so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 2:49 PM
 To: Brahma Reddy Battula
 Cc: Akhil Das; user@spark.apache.org
 Subject: Re: Running beyond physical memory limits

 This is not related to executor memory, but the extra overhead
 subtracted from the executor's size in order to avoid using more than
 the physical memory that YARN allows. That is, if you declare a 32G
 executor YARN lets you use 32G physical memory but your JVM heap must
 be significantly less than 32G max. This is the overhead factor that
 is subtracted for you, and it seems to need to be bigger in your case.

 On Wed, Apr 15, 2015 at 10:16 AM, Brahma Reddy Battula
 brahmareddy.batt...@huawei.com wrote:
  Thanks lot for your reply..
 
There is no issue with spark1.1..Following issue came when I upgrade to
  spark2.0...Hence I did not decrease spark.executor.memory...
  I mean to say, used same config for spark1.1 and spark1.2..
 
  Is there any issue with spark1.2..?
  Or Yarn will lead this..?
  And why executor will not release memory, if there are tasks running..?
 
 
  Thanks  Regards
 
  Brahma Reddy Battula
 
 
  
  From: Akhil Das [ak...@sigmoidanalytics.com]
  Sent: Wednesday, April 15, 2015 2:35 PM
  To: Brahma Reddy Battula
  Cc: user@spark.apache.org
  Subject: Re: Running beyond physical memory limits
 
  Did you try reducing your spark.executor.memory?
 
  Thanks
  Best Regards
 
  On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula
  brahmareddy.batt...@huawei.com wrote:
 
  Hello Sparkers
 
 
  I am newbie to spark and  need help.. We are using spark 1.2, we are
  getting the following error and executor is getting killed..I seen
  SPARK-1930 and it should be in 1.2..
 
  Any pointer to following error, like what might lead this error..
 
 
  2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container
  [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is
  running beyond physical memory limits. Current usage: 26.0 GB of 26 GB
  physical memory used; 26.7 GB of 260 GB virtual memory used. Killing
  container.
  Dump of the process-tree for container_1429065217137_0012_01_-411041790
 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
  SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352
  6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
  -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m
 
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
  -Dspark.driver.port=23204 -Dspark.random.port.max=23999
  -Dspark.akka.threads=32 -Dspark.akka.frameSize=10
 -Dspark.akka.timeout=100
  -Dspark.ui.port=23000 -Dspark.random.port.min=23000
 
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3
  hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960
 126843
  126843 (bash) 0 0 11603968 331 /bin/bash -c
  /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
  -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m
 
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
  '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999'
  '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10'
  '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000'
  '-Dspark.random.port.min=23000'
 
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  

Re: multinomial and Bernoulli model in NaiveBayes

2015-04-15 Thread Xiangrui Meng
CC Leah, who added Bernoulli option to MLlib's NaiveBayes. -Xiangrui

On Wed, Apr 15, 2015 at 4:49 AM, 姜林和 linhe_ji...@163.com wrote:


 Dear meng:
 Thanks for the great work for park machine learning, and I saw the
 changes for  NaiveBayes algorithm ,
 separate the algorithm  to : multinomial model  and Bernoulli model ,but
 there be something confused me:

 the caculating of
 P(Ci) -- pi(i)
 P(j|Ci) -- theta(i,j)

 on  multinomial and Bernoulli model are all different ,I can only see
  theta(i,j)  is calculate on different way,but not pi(i)


 Bernoulli:
 the origin feature vector i of label must be 0 or 1, 1 represent word j is
 exits in Document i,

 pi(i) = (number of Documents of class C(i) + lamda)/(number of Documents
 of all class + 2*lamda  )
 theta(i)(j) = (number of Documents which j exists in class C(i) +
 lamda)/(number of Documents of class C(i) + 2*lamda  )

 Multinomial:

 pi(i) = (number of words of class C(i) + lamda)/(number of words of all
 classes + numFeatures*lamda  )
 theta(i)(j) = (number of words j in class C(i) + lamda)/(number of words
  in class C(i) + numFeatures*lamda  )

 the conparison of  two  algorithm :


 definition in Multinomial Multinomial definition in Bernoulli
 Bernoulli  pi(i) number of words of class C(i) math.log(numAllWordsOfC +
 lambda) -piLogDenom  number of Documents of class C(i) math.log(n +
 lambda) - piLogDenom  piLogDenom  number of words of all classes 
 math.log(numAllWords
 + numfeatures* lambda) number of Documents of all class math.log(numDocuments
 + 2 * lambda)  theta(i)(j)  number of words j in class C(i)
 math.log(sumTermFreqs(j) + lamda) - thetaLogDenom number of Documents
 which j exists in class C(i) theta(i)(j) = math.log(sumTermFreqs(j) +
 lamda) - thetaLogDenom  thetaLogDenom number of words  in class C(i) 
 math.log(numAllWordsOfC
 +  numfeatures*lambda) number of Documents of class C(i) math.log(n + 2 *
 lamda)

 best   regard !

 Linhe Jiang






 Linhe  Jiang





Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2

2015-04-15 Thread Manoj Samel
Env - Spark 1.3 Hadoop 2.3, Kerbeos

 xx.saveAsTextFile(path, codec) gives following trace. Same works with
Spark 1.2 in same environment

val codec = classOf[some codec class]

val a = sc.textFile(/some_hdfs_file)

a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace
in Spark 1.3, works in Spark 1.2 in same env

15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0
(TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot
authenticate the provider BC) [duplicate 7]
15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate
the provider BC
at javax.crypto.Cipher.getInstance(Cipher.java:642)
at javax.crypto.Cipher.getInstance(Cipher.java:580)
 some codec calls 
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.jar.JarException:
file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned
entries - org/apache/spark/SparkHadoopWriter$.class
at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
at javax.crypto.Cipher.getInstance(Cipher.java:638)
... 16 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
http://org.apache.spark.scheduler.dagscheduler.org/
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Re: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Sean Owen
What do you mean by batch RDD? they're just RDDs, though store their
data in different ways and come from different sources. You can union
an RDD from an HDFS file with one from a DStream.

It sounds like you want streaming data to live longer than its batch
interval, but that's not something you can expect the streaming
framework to provide. It's perfectly possible to save the RDD's data
to persistent store and use it later.

You can't update RDDs; they're immutable. You can re-read data from
persistent store by making a new RDD at any time.

On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 The only way to join / union /cogroup a DStream RDD with Batch RDD is via the
 transform method, which returns another DStream RDD and hence it gets
 discarded at the end of the micro-batch.

 Is there any way to e.g. union Dstream RDD with Batch RDD which produces a
 new Batch RDD containing the elements of both the DStream RDD and the Batch
 RDD.

 And once such Batch RDD is created in the above way, can it be used by other
 DStream RDDs to e.g. join with as this time the result can be another
 DStream RDD

 Effectively the functionality described above will result in periodical
 updates (additions) of elements to a Batch RDD - the additional elements
 will keep coming from DStream RDDs which keep streaming in with every
 micro-batch.
 Also newly arriving DStream RDDs will be able to join with the thus
 previously updated BAtch RDD and produce a result DStream RDD

 Something almost like that can be achieved with updateStateByKey, but is
 there a way to do it as described here



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-elements-to-batch-RDD-from-DStream-RDD-tp22504.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SparkSQL; Thriftserver] Help tracking missing 5 minutes

2015-04-15 Thread Yana Kadiyska
Hi Spark users,

Trying to upgrade to Spark1.2 and running into the following

seeing some very slow queries and wondering if someone can point me in the
right direction for debugging. My Spark UI shows a job with duration 15s
(see attached screenshot). Which would be great but client side measurement
shows the query takes just over 4 min

15/04/15 16:34:59 INFO ParseDriver: Parsing command: ***my query here***
15/04/15 16:38:28 INFO ParquetTypesConverter: Falling back to schema
conversion from Parquet types;
15/04/15 16:38:29 INFO DAGScheduler: Got job 6 (collect at
SparkPlan.scala:84) with 401 output partitions (allowLocal=false)

​

So there is a gap of almost 4 min between the parse and the next line I can
identify as relating to this job. Can someone shed some light on what
happens between the parse and the DAG scheduler? The spark UI also shows
the submitted time as 16:38 which leads me to believe it counts time from
when the scheduler gets the job...but what happens before then?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Evo Eftimov
The only way to join / union /cogroup a DStream RDD with Batch RDD is via the
transform method, which returns another DStream RDD and hence it gets
discarded at the end of the micro-batch. 

Is there any way to e.g. union Dstream RDD with Batch RDD which produces a
new Batch RDD containing the elements of both the DStream RDD and the Batch
RDD. 

And once such Batch RDD is created in the above way, can it be used by other
DStream RDDs to e.g. join with as this time the result can be another
DStream RDD

Effectively the functionality described above will result in periodical
updates (additions) of elements to a Batch RDD - the additional elements
will keep coming from DStream RDDs which keep streaming in with every
micro-batch. 
Also newly arriving DStream RDDs will be able to join with the thus
previously updated BAtch RDD and produce a result DStream RDD  

Something almost like that can be achieved with updateStateByKey, but is
there a way to do it as described here   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-elements-to-batch-RDD-from-DStream-RDD-tp22504.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



exception during foreach run

2015-04-15 Thread Jeetendra Gangele
Hi All

I am getting below exception while running foreach after zipwithindex
,flatMapvalue,flatmapvalues,
Insideview foreach I m doing lookup in broadcast variable


java.util.concurrent.RejectedExecutionException: Worker has already been
shutdown
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
at
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
at
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
at
org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
at
org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
at
org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
at
org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
at
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
at
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
at scala.util.Success.foreach(Try.scala:205)
at
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
at
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
:


Re: RAM management during cogroup and join

2015-04-15 Thread Tathagata Das
Significant optimizations can be made by doing the joining/cogroup in a
smart way. If you have to join streaming RDDs with the same batch RDD, then
you can first partition the batch RDDs using a partitions and cache it, and
then use the same partitioner on the streaming RDDs. That would make sure
that the large batch RDDs is not partitioned repeatedly for the cogroup,
only the small streaming RDDs are partitioned.

HTH

TD

On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 There are indications that joins in Spark are implemented with / based on
 the
 cogroup function/primitive/transform. So let me focus first on cogroup - it
 returns a result which is RDD consisting of essentially ALL elements of the
 cogrouped RDDs. Said in another way - for every key in each of the
 cogrouped
 RDDs there is at least one element from at least one of the cogrouped RDDs.

 That would mean that when smaller, moreover streaming e.g.
 JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
 would result in RAM allocated for multiple instances of the result
 (cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
 Obviously the RAM will get returned when the DStream RDDs get discard and
 they do on a regular basis, but still that seems as unnecessary spike in
 the
 RAM consumption

 I have two questions:

 1.Is there anyway to control the cogroup process more precisely e.g. tell
 it to include I the cogrouped RDD only elements where there are at least
 one
 element from EACH of the cogrouped RDDs per given key. Based on the current
 cogroup API this is not possible


 2.If the cogroup is really such a sledgehammer and secondly the joins are
 based on cogroup then even though they can present a prettier picture in
 terms of the end result visible to the end user does that mean that under
 the hood there is still the same atrocious RAM consumption going on




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: RAM management during cogroup and join

2015-04-15 Thread Evo Eftimov
That has been done Sir and represents further optimizations – the objective 
here was to confirm whether cogroup always results in the previously described 
“greedy” explosion of the number of elements included and RAM allocated for the 
result RDD 

 

The optimizations mentioned still don’t change the total number of elements 
included in the result RDD and RAM allocated – right? 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 9:25 PM
To: Evo Eftimov
Cc: user
Subject: Re: RAM management during cogroup and join

 

Significant optimizations can be made by doing the joining/cogroup in a smart 
way. If you have to join streaming RDDs with the same batch RDD, then you can 
first partition the batch RDDs using a partitions and cache it, and then use 
the same partitioner on the streaming RDDs. That would make sure that the large 
batch RDDs is not partitioned repeatedly for the cogroup, only the small 
streaming RDDs are partitioned.

 

HTH

 

TD

 

On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote:

There are indications that joins in Spark are implemented with / based on the
cogroup function/primitive/transform. So let me focus first on cogroup - it
returns a result which is RDD consisting of essentially ALL elements of the
cogrouped RDDs. Said in another way - for every key in each of the cogrouped
RDDs there is at least one element from at least one of the cogrouped RDDs.

That would mean that when smaller, moreover streaming e.g.
JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
would result in RAM allocated for multiple instances of the result
(cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
Obviously the RAM will get returned when the DStream RDDs get discard and
they do on a regular basis, but still that seems as unnecessary spike in the
RAM consumption

I have two questions:

1.Is there anyway to control the cogroup process more precisely e.g. tell
it to include I the cogrouped RDD only elements where there are at least one
element from EACH of the cogrouped RDDs per given key. Based on the current
cogroup API this is not possible


2.If the cogroup is really such a sledgehammer and secondly the joins are
based on cogroup then even though they can present a prettier picture in
terms of the end result visible to the end user does that mean that under
the hood there is still the same atrocious RAM consumption going on




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: RAM management during cogroup and join

2015-04-15 Thread Tathagata Das
Agreed.

On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 That has been done Sir and represents further optimizations – the
 objective here was to confirm whether cogroup always results in the
 previously described “greedy” explosion of the number of elements included
 and RAM allocated for the result RDD



 The optimizations mentioned still don’t change the total number of
 elements included in the result RDD and RAM allocated – right?



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, April 15, 2015 9:25 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: RAM management during cogroup and join



 Significant optimizations can be made by doing the joining/cogroup in a
 smart way. If you have to join streaming RDDs with the same batch RDD, then
 you can first partition the batch RDDs using a partitions and cache it, and
 then use the same partitioner on the streaming RDDs. That would make sure
 that the large batch RDDs is not partitioned repeatedly for the cogroup,
 only the small streaming RDDs are partitioned.



 HTH



 TD



 On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 There are indications that joins in Spark are implemented with / based on
 the
 cogroup function/primitive/transform. So let me focus first on cogroup - it
 returns a result which is RDD consisting of essentially ALL elements of the
 cogrouped RDDs. Said in another way - for every key in each of the
 cogrouped
 RDDs there is at least one element from at least one of the cogrouped RDDs.

 That would mean that when smaller, moreover streaming e.g.
 JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
 would result in RAM allocated for multiple instances of the result
 (cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
 Obviously the RAM will get returned when the DStream RDDs get discard and
 they do on a regular basis, but still that seems as unnecessary spike in
 the
 RAM consumption

 I have two questions:

 1.Is there anyway to control the cogroup process more precisely e.g. tell
 it to include I the cogrouped RDD only elements where there are at least
 one
 element from EACH of the cogrouped RDDs per given key. Based on the current
 cogroup API this is not possible


 2.If the cogroup is really such a sledgehammer and secondly the joins are
 based on cogroup then even though they can present a prettier picture in
 terms of the end result visible to the end user does that mean that under
 the hood there is still the same atrocious RAM consumption going on




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RE: RAM management during cogroup and join

2015-04-15 Thread Evo Eftimov
Thank you Sir, and one final confirmation/clarification -  are all forms of 
joins in the Spark API for DStream RDDs based on cogroup in terms of their 
internal implementation 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 9:48 PM
To: Evo Eftimov
Cc: user
Subject: Re: RAM management during cogroup and join

 

Agreed. 

 

On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote:

That has been done Sir and represents further optimizations – the objective 
here was to confirm whether cogroup always results in the previously described 
“greedy” explosion of the number of elements included and RAM allocated for the 
result RDD 

 

The optimizations mentioned still don’t change the total number of elements 
included in the result RDD and RAM allocated – right? 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 9:25 PM
To: Evo Eftimov
Cc: user
Subject: Re: RAM management during cogroup and join

 

Significant optimizations can be made by doing the joining/cogroup in a smart 
way. If you have to join streaming RDDs with the same batch RDD, then you can 
first partition the batch RDDs using a partitions and cache it, and then use 
the same partitioner on the streaming RDDs. That would make sure that the large 
batch RDDs is not partitioned repeatedly for the cogroup, only the small 
streaming RDDs are partitioned.

 

HTH

 

TD

 

On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote:

There are indications that joins in Spark are implemented with / based on the
cogroup function/primitive/transform. So let me focus first on cogroup - it
returns a result which is RDD consisting of essentially ALL elements of the
cogrouped RDDs. Said in another way - for every key in each of the cogrouped
RDDs there is at least one element from at least one of the cogrouped RDDs.

That would mean that when smaller, moreover streaming e.g.
JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
would result in RAM allocated for multiple instances of the result
(cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
Obviously the RAM will get returned when the DStream RDDs get discard and
they do on a regular basis, but still that seems as unnecessary spike in the
RAM consumption

I have two questions:

1.Is there anyway to control the cogroup process more precisely e.g. tell
it to include I the cogrouped RDD only elements where there are at least one
element from EACH of the cogrouped RDDs per given key. Based on the current
cogroup API this is not possible


2.If the cogroup is really such a sledgehammer and secondly the joins are
based on cogroup then even though they can present a prettier picture in
terms of the end result visible to the end user does that mean that under
the hood there is still the same atrocious RAM consumption going on




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 



RE: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Evo Eftimov
I keep seeing only common statements

Re DStream RDDs and Batch RDDs - There is certainly something to keep me from 
using them together and it is the OO API differences I have described 
previously, several times ...

Re the batch RDD reloading from file and that there is no need for threads - 
the driver of spark streaming app instantiates and submits a DAG pipeline to 
the spark streaming cluster and keeps it alive while it is running - this is 
not exactly a liner execution where the main thread of the driver can invoke 
the spark context method for loading batch RDDs from file for e.g. a second 
time moreover after specific period of time   

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, April 15, 2015 8:14 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: adding new elements to batch RDD from DStream RDD

Yes, I mean there's nothing to keep you from using them together other than 
their very different lifetime. That's probably the key here: if you need the 
streaming data to live a long time it has to live in persistent storage first.

I do exactly this and what you describe for the same purpose.
I don't believe there's any need for threads; an RDD is just bookkeeping about 
partitions, and that has to be re-assessed when the underlying data grows. But 
making a new RDD on the fly is easy. It's a reference to the data only.

(Well, that changes if you cache the results, in which case you very much care 
about unpersisting the RDD before getting a different reference to all of the 
same data and more.)




On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 Hi Sean well there is certainly a difference between batch RDD and 
 streaming RDD and in the previous reply you have already outlined some. 
 Other differences are in the Object Oriented Model / API of Spark, which also 
 matters besides the RDD / Spark Cluster Platform architecture.

 Secondly, in the previous em I have clearly described what I mean by 
 update and that it is a result of RDD transformation and hence a new 
 RDD derived from the previously joined/union/cogrouped one - ie not 
 mutating an existing RDD

 Lets also leave aside the architectural goal why I want to keep updating a 
 batch RDD with new data coming from DStream RDDs - fyi it is NOT to make 
 streaming RDDs long living

 Let me now go back to the overall objective - the app context is Spark 
 Streaming job. I want to update / add the content of incoming 
 streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from 
 HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / 
 cogroup from DSTreamRDD to batch RDD is via the transform method 
 which always returns DStream RDD NOT batch RDD - check the API

 On a separate note - your suggestion to keep reloading a Batch RDD 
 from a file - it may have some applications in other scenarios so lets 
 drill down into it - in the context of Spark Streaming app where the 
 driver launches a DAG pipeline and then just essentially hangs, I 
 guess the only way to keep reloading a batch RDD from file is from a 
 separate thread still using the same spark context. The thread will 
 reload the batch RDD with the same reference ie reassign the reference 
 to the newly instantiated/loaded batch RDD - is that what you mean by 
 reloading batch RDD from file

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 7:43 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 What do you mean by batch RDD? they're just RDDs, though store their data 
 in different ways and come from different sources. You can union an RDD from 
 an HDFS file with one from a DStream.

 It sounds like you want streaming data to live longer than its batch 
 interval, but that's not something you can expect the streaming framework to 
 provide. It's perfectly possible to save the RDD's data to persistent store 
 and use it later.

 You can't update RDDs; they're immutable. You can re-read data from 
 persistent store by making a new RDD at any time.

 On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 The only way to join / union /cogroup a DStream RDD with Batch RDD is 
 via the transform method, which returns another DStream RDD and 
 hence it gets discarded at the end of the micro-batch.

 Is there any way to e.g. union Dstream RDD with Batch RDD which 
 produces a new Batch RDD containing the elements of both the DStream 
 RDD and the Batch RDD.

 And once such Batch RDD is created in the above way, can it be used 
 by other DStream RDDs to e.g. join with as this time the result can 
 be another DStream RDD

 Effectively the functionality described above will result in 
 periodical updates (additions) of elements to a Batch RDD - the 
 additional elements will keep coming from DStream RDDs which keep 
 streaming in with every micro-batch.

Re: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Sean Owen
Yep, you are looking at operations on DStream, which is not what I'm
talking about. You should look at DStream.foreachRDD (or Java
equivalent), which hands you an RDD. Makes more sense?

The rest may make more sense when you try it. There is actually a lot
less complexity than you think.

On Wed, Apr 15, 2015 at 8:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 The OO API in question was mentioned several times - as the transform method 
 of DStreamRDD which is the ONLY way to join/cogroup/union DSTreamRDD with 
 batch RDD aka JavaRDD

 Here is paste from the spark javadoc

 K2,V2 JavaPairDStreamK2,V2 transformToPair(FunctionR,JavaPairRDDK2,V2 
 transformFunc)
 Return a new DStream in which each RDD is generated by applying a function on 
 each RDD of 'this' DStream.

 As you can see it ALWAYS returns a DStream NOT a JavaRDD aka batch RDD

 Re the rest of the discussion (re-loading batch RDD from file within spark 
 steraming context) - lets leave that since we are not getting anywhere

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 8:30 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 What API differences are you talking about? a DStream gives a sequence of 
 RDDs. I'm not referring to DStream or its API.

 Spark in general can execute many pipelines at once, ones that even refer to 
 the same RDD. What I mean you seem to be looking for a way to change one 
 shared RDD, but in fact, you simply create an RDD on top of the current state 
 of the data whenever and wherever you wish. Unless you're caching the RDD's 
 blocks, you don't have much need to share a reference to one RDD anyway, 
 which is what I thought you were getting at.

 On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 I keep seeing only common statements

 Re DStream RDDs and Batch RDDs - There is certainly something to keep me 
 from using them together and it is the OO API differences I have described 
 previously, several times ...

 Re the batch RDD reloading from file and that there is no need for
 threads - the driver of spark streaming app instantiates and submits
 a DAG pipeline to the spark streaming cluster and keeps it alive while
 it is running - this is not exactly a liner execution where the main
 thread of the driver can invoke the spark context method for loading
 batch RDDs from file for e.g. a second time moreover after specific
 period of time

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 8:14 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 Yes, I mean there's nothing to keep you from using them together other than 
 their very different lifetime. That's probably the key here: if you need the 
 streaming data to live a long time it has to live in persistent storage 
 first.

 I do exactly this and what you describe for the same purpose.
 I don't believe there's any need for threads; an RDD is just bookkeeping 
 about partitions, and that has to be re-assessed when the underlying data 
 grows. But making a new RDD on the fly is easy. It's a reference to the 
 data only.

 (Well, that changes if you cache the results, in which case you very
 much care about unpersisting the RDD before getting a different
 reference to all of the same data and more.)




 On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 Hi Sean well there is certainly a difference between batch RDD and 
 streaming RDD and in the previous reply you have already outlined some. 
 Other differences are in the Object Oriented Model / API of Spark, which 
 also matters besides the RDD / Spark Cluster Platform architecture.

 Secondly, in the previous em I have clearly described what I mean by
 update and that it is a result of RDD transformation and hence a
 new RDD derived from the previously joined/union/cogrouped one - ie
 not mutating an existing RDD

 Lets also leave aside the architectural goal why I want to keep updating a 
 batch RDD with new data coming from DStream RDDs - fyi it is NOT to make 
 streaming RDDs long living

 Let me now go back to the overall objective - the app context is
 Spark Streaming job. I want to update / add the content of
 incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded
 (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union
 / join / cogroup from DSTreamRDD to batch RDD is via the transform
 method which always returns DStream RDD NOT batch RDD - check the API

 On a separate note - your suggestion to keep reloading a Batch RDD
 from a file - it may have some applications in other scenarios so
 lets drill down into it - in the context of Spark Streaming app where
 the driver launches a DAG pipeline and then just essentially hangs, I
 guess the only way to keep reloading a batch RDD from file is from 

RAM management during cogroup and join

2015-04-15 Thread Evo Eftimov
There are indications that joins in Spark are implemented with / based on the
cogroup function/primitive/transform. So let me focus first on cogroup - it
returns a result which is RDD consisting of essentially ALL elements of the
cogrouped RDDs. Said in another way - for every key in each of the cogrouped
RDDs there is at least one element from at least one of the cogrouped RDDs. 

That would mean that when smaller, moreover streaming e.g.
JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
would result in RAM allocated for multiple instances of the result
(cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
Obviously the RAM will get returned when the DStream RDDs get discard and
they do on a regular basis, but still that seems as unnecessary spike in the
RAM consumption 

I have two questions: 

1.Is there anyway to control the cogroup process more precisely e.g. tell
it to include I the cogrouped RDD only elements where there are at least one
element from EACH of the cogrouped RDDs per given key. Based on the current
cogroup API this is not possible 


2.If the cogroup is really such a sledgehammer and secondly the joins are
based on cogroup then even though they can present a prettier picture in
terms of the end result visible to the end user does that mean that under
the hood there is still the same atrocious RAM consumption going on 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Evo Eftimov
Hi Sean well there is certainly a difference between batch RDD and 
streaming RDD and in the previous reply you have already outlined some. Other 
differences are in the Object Oriented Model / API of Spark, which also matters 
besides the RDD / Spark Cluster Platform architecture.

Secondly, in the previous em I have clearly described what I mean by update 
and that it is a result of RDD transformation and hence a new RDD derived from 
the previously joined/union/cogrouped one - ie not mutating an existing RDD

Lets also leave aside the architectural goal why I want to keep updating a 
batch RDD with new data coming from DStream RDDs - fyi it is NOT to make 
streaming RDDs long living  

Let me now go back to the overall objective - the app context is Spark 
Streaming job. I want to update / add the content of incoming streaming 
RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch 
RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to 
batch RDD is via the transform method which always returns DStream RDD NOT 
batch RDD - check the API

On a separate note - your suggestion to keep reloading a Batch RDD from a file 
- it may have some applications in other scenarios so lets drill down into it - 
in the context of Spark Streaming app where the driver launches a DAG pipeline 
and then just essentially hangs, I guess the only way to keep reloading a batch 
RDD from file is from a separate thread still using the same spark context. The 
thread will reload the batch RDD with the same reference ie reassign the 
reference to the newly instantiated/loaded batch RDD - is that what you mean by 
reloading batch RDD from file   

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, April 15, 2015 7:43 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: adding new elements to batch RDD from DStream RDD

What do you mean by batch RDD? they're just RDDs, though store their data in 
different ways and come from different sources. You can union an RDD from an 
HDFS file with one from a DStream.

It sounds like you want streaming data to live longer than its batch interval, 
but that's not something you can expect the streaming framework to provide. 
It's perfectly possible to save the RDD's data to persistent store and use it 
later.

You can't update RDDs; they're immutable. You can re-read data from persistent 
store by making a new RDD at any time.

On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 The only way to join / union /cogroup a DStream RDD with Batch RDD is 
 via the transform method, which returns another DStream RDD and 
 hence it gets discarded at the end of the micro-batch.

 Is there any way to e.g. union Dstream RDD with Batch RDD which 
 produces a new Batch RDD containing the elements of both the DStream 
 RDD and the Batch RDD.

 And once such Batch RDD is created in the above way, can it be used by 
 other DStream RDDs to e.g. join with as this time the result can be 
 another DStream RDD

 Effectively the functionality described above will result in 
 periodical updates (additions) of elements to a Batch RDD - the 
 additional elements will keep coming from DStream RDDs which keep 
 streaming in with every micro-batch.
 Also newly arriving DStream RDDs will be able to join with the thus 
 previously updated BAtch RDD and produce a result DStream RDD

 Something almost like that can be achieved with updateStateByKey, but 
 is there a way to do it as described here



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-element
 s-to-batch-RDD-from-DStream-RDD-tp22504.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Sean Owen
Yes, I mean there's nothing to keep you from using them together other
than their very different lifetime. That's probably the key here: if
you need the streaming data to live a long time it has to live in
persistent storage first.

I do exactly this and what you describe for the same purpose.
I don't believe there's any need for threads; an RDD is just
bookkeeping about partitions, and that has to be re-assessed when the
underlying data grows. But making a new RDD on the fly is easy. It's a
reference to the data only.

(Well, that changes if you cache the results, in which case you very
much care about unpersisting the RDD before getting a different
reference to all of the same data and more.)




On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 Hi Sean well there is certainly a difference between batch RDD and 
 streaming RDD and in the previous reply you have already outlined some. 
 Other differences are in the Object Oriented Model / API of Spark, which also 
 matters besides the RDD / Spark Cluster Platform architecture.

 Secondly, in the previous em I have clearly described what I mean by update 
 and that it is a result of RDD transformation and hence a new RDD derived 
 from the previously joined/union/cogrouped one - ie not mutating an 
 existing RDD

 Lets also leave aside the architectural goal why I want to keep updating a 
 batch RDD with new data coming from DStream RDDs - fyi it is NOT to make 
 streaming RDDs long living

 Let me now go back to the overall objective - the app context is Spark 
 Streaming job. I want to update / add the content of incoming streaming 
 RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch 
 RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to 
 batch RDD is via the transform method which always returns DStream RDD NOT 
 batch RDD - check the API

 On a separate note - your suggestion to keep reloading a Batch RDD from a 
 file - it may have some applications in other scenarios so lets drill down 
 into it - in the context of Spark Streaming app where the driver launches a 
 DAG pipeline and then just essentially hangs, I guess the only way to keep 
 reloading a batch RDD from file is from a separate thread still using the 
 same spark context. The thread will reload the batch RDD with the same 
 reference ie reassign the reference to the newly instantiated/loaded batch 
 RDD - is that what you mean by reloading batch RDD from file

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 7:43 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 What do you mean by batch RDD? they're just RDDs, though store their data 
 in different ways and come from different sources. You can union an RDD from 
 an HDFS file with one from a DStream.

 It sounds like you want streaming data to live longer than its batch 
 interval, but that's not something you can expect the streaming framework to 
 provide. It's perfectly possible to save the RDD's data to persistent store 
 and use it later.

 You can't update RDDs; they're immutable. You can re-read data from 
 persistent store by making a new RDD at any time.

 On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 The only way to join / union /cogroup a DStream RDD with Batch RDD is
 via the transform method, which returns another DStream RDD and
 hence it gets discarded at the end of the micro-batch.

 Is there any way to e.g. union Dstream RDD with Batch RDD which
 produces a new Batch RDD containing the elements of both the DStream
 RDD and the Batch RDD.

 And once such Batch RDD is created in the above way, can it be used by
 other DStream RDDs to e.g. join with as this time the result can be
 another DStream RDD

 Effectively the functionality described above will result in
 periodical updates (additions) of elements to a Batch RDD - the
 additional elements will keep coming from DStream RDDs which keep
 streaming in with every micro-batch.
 Also newly arriving DStream RDDs will be able to join with the thus
 previously updated BAtch RDD and produce a result DStream RDD

 Something almost like that can be achieved with updateStateByKey, but
 is there a way to do it as described here



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-element
 s-to-batch-RDD-from-DStream-RDD-tp22504.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Sean Owen
What API differences are you talking about? a DStream gives a sequence
of RDDs. I'm not referring to DStream or its API.

Spark in general can execute many pipelines at once, ones that even
refer to the same RDD. What I mean you seem to be looking for a way to
change one shared RDD, but in fact, you simply create an RDD on top of
the current state of the data whenever and wherever you wish. Unless
you're caching the RDD's blocks, you don't have much need to share a
reference to one RDD anyway, which is what I thought you were getting
at.

On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 I keep seeing only common statements

 Re DStream RDDs and Batch RDDs - There is certainly something to keep me 
 from using them together and it is the OO API differences I have described 
 previously, several times ...

 Re the batch RDD reloading from file and that there is no need for threads 
 - the driver of spark streaming app instantiates and submits a DAG pipeline 
 to the spark streaming cluster and keeps it alive while it is running - this 
 is not exactly a liner execution where the main thread of the driver can 
 invoke the spark context method for loading batch RDDs from file for e.g. a 
 second time moreover after specific period of time

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 8:14 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 Yes, I mean there's nothing to keep you from using them together other than 
 their very different lifetime. That's probably the key here: if you need the 
 streaming data to live a long time it has to live in persistent storage first.

 I do exactly this and what you describe for the same purpose.
 I don't believe there's any need for threads; an RDD is just bookkeeping 
 about partitions, and that has to be re-assessed when the underlying data 
 grows. But making a new RDD on the fly is easy. It's a reference to the 
 data only.

 (Well, that changes if you cache the results, in which case you very much 
 care about unpersisting the RDD before getting a different reference to all 
 of the same data and more.)




 On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 Hi Sean well there is certainly a difference between batch RDD and 
 streaming RDD and in the previous reply you have already outlined some. 
 Other differences are in the Object Oriented Model / API of Spark, which 
 also matters besides the RDD / Spark Cluster Platform architecture.

 Secondly, in the previous em I have clearly described what I mean by
 update and that it is a result of RDD transformation and hence a new
 RDD derived from the previously joined/union/cogrouped one - ie not
 mutating an existing RDD

 Lets also leave aside the architectural goal why I want to keep updating a 
 batch RDD with new data coming from DStream RDDs - fyi it is NOT to make 
 streaming RDDs long living

 Let me now go back to the overall objective - the app context is Spark
 Streaming job. I want to update / add the content of incoming
 streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from
 HDFS file) batch RDD e.g. JavaRDD - the only way to union / join /
 cogroup from DSTreamRDD to batch RDD is via the transform method
 which always returns DStream RDD NOT batch RDD - check the API

 On a separate note - your suggestion to keep reloading a Batch RDD
 from a file - it may have some applications in other scenarios so lets
 drill down into it - in the context of Spark Streaming app where the
 driver launches a DAG pipeline and then just essentially hangs, I
 guess the only way to keep reloading a batch RDD from file is from a
 separate thread still using the same spark context. The thread will
 reload the batch RDD with the same reference ie reassign the reference
 to the newly instantiated/loaded batch RDD - is that what you mean by
 reloading batch RDD from file

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 7:43 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 What do you mean by batch RDD? they're just RDDs, though store their data 
 in different ways and come from different sources. You can union an RDD from 
 an HDFS file with one from a DStream.

 It sounds like you want streaming data to live longer than its batch 
 interval, but that's not something you can expect the streaming framework to 
 provide. It's perfectly possible to save the RDD's data to persistent store 
 and use it later.

 You can't update RDDs; they're immutable. You can re-read data from 
 persistent store by making a new RDD at any time.

 On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 The only way to join / union /cogroup a DStream RDD with Batch RDD is
 via the transform method, which returns another DStream 

RE: adding new elements to batch RDD from DStream RDD

2015-04-15 Thread Evo Eftimov
The OO API in question was mentioned several times - as the transform method of 
DStreamRDD which is the ONLY way to join/cogroup/union DSTreamRDD with batch 
RDD aka JavaRDD 

Here is paste from the spark javadoc 

K2,V2 JavaPairDStreamK2,V2 transformToPair(FunctionR,JavaPairRDDK2,V2 
transformFunc)
Return a new DStream in which each RDD is generated by applying a function on 
each RDD of 'this' DStream.

As you can see it ALWAYS returns a DStream NOT a JavaRDD aka batch RDD 

Re the rest of the discussion (re-loading batch RDD from file within spark 
steraming context) - lets leave that since we are not getting anywhere 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, April 15, 2015 8:30 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: adding new elements to batch RDD from DStream RDD

What API differences are you talking about? a DStream gives a sequence of RDDs. 
I'm not referring to DStream or its API.

Spark in general can execute many pipelines at once, ones that even refer to 
the same RDD. What I mean you seem to be looking for a way to change one shared 
RDD, but in fact, you simply create an RDD on top of the current state of the 
data whenever and wherever you wish. Unless you're caching the RDD's blocks, 
you don't have much need to share a reference to one RDD anyway, which is what 
I thought you were getting at.

On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 I keep seeing only common statements

 Re DStream RDDs and Batch RDDs - There is certainly something to keep me 
 from using them together and it is the OO API differences I have described 
 previously, several times ...

 Re the batch RDD reloading from file and that there is no need for 
 threads - the driver of spark streaming app instantiates and submits 
 a DAG pipeline to the spark streaming cluster and keeps it alive while 
 it is running - this is not exactly a liner execution where the main 
 thread of the driver can invoke the spark context method for loading 
 batch RDDs from file for e.g. a second time moreover after specific 
 period of time

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 8:14 PM
 To: Evo Eftimov
 Cc: user@spark.apache.org
 Subject: Re: adding new elements to batch RDD from DStream RDD

 Yes, I mean there's nothing to keep you from using them together other than 
 their very different lifetime. That's probably the key here: if you need the 
 streaming data to live a long time it has to live in persistent storage first.

 I do exactly this and what you describe for the same purpose.
 I don't believe there's any need for threads; an RDD is just bookkeeping 
 about partitions, and that has to be re-assessed when the underlying data 
 grows. But making a new RDD on the fly is easy. It's a reference to the 
 data only.

 (Well, that changes if you cache the results, in which case you very 
 much care about unpersisting the RDD before getting a different 
 reference to all of the same data and more.)




 On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:
 Hi Sean well there is certainly a difference between batch RDD and 
 streaming RDD and in the previous reply you have already outlined some. 
 Other differences are in the Object Oriented Model / API of Spark, which 
 also matters besides the RDD / Spark Cluster Platform architecture.

 Secondly, in the previous em I have clearly described what I mean by 
 update and that it is a result of RDD transformation and hence a 
 new RDD derived from the previously joined/union/cogrouped one - ie 
 not mutating an existing RDD

 Lets also leave aside the architectural goal why I want to keep updating a 
 batch RDD with new data coming from DStream RDDs - fyi it is NOT to make 
 streaming RDDs long living

 Let me now go back to the overall objective - the app context is 
 Spark Streaming job. I want to update / add the content of 
 incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded 
 (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union 
 / join / cogroup from DSTreamRDD to batch RDD is via the transform 
 method which always returns DStream RDD NOT batch RDD - check the API

 On a separate note - your suggestion to keep reloading a Batch RDD 
 from a file - it may have some applications in other scenarios so 
 lets drill down into it - in the context of Spark Streaming app where 
 the driver launches a DAG pipeline and then just essentially hangs, I 
 guess the only way to keep reloading a batch RDD from file is from a 
 separate thread still using the same spark context. The thread will 
 reload the batch RDD with the same reference ie reassign the 
 reference to the newly instantiated/loaded batch RDD - is that what 
 you mean by reloading batch RDD from file

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Wednesday, April 15, 2015 7:43 PM
 

aliasing aggregate columns?

2015-04-15 Thread elliott cordo
Hi Guys -

Having trouble figuring out the semantics for using the alias function on
the final sum and count aggregations?

 cool_summary = reviews.select(reviews.user_id,
cool_cnt(votes.cool).alias(cool_cnt)).groupBy(user_id).agg({cool_cnt:sum,*:count})

 cool_summary

DataFrame[user_id: string, SUM(cool_cnt#725): double, COUNT(1): bigint]


Re: spark job progress-style report on console ?

2015-04-15 Thread syepes
Just add the following line spark.ui.showConsoleProgress  true do your
conf/spark-defaults.conf file.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-job-progress-style-report-on-console-tp22440p22506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RAM management during cogroup and join

2015-04-15 Thread Tathagata Das
Well, DStream joins are nothing but RDD joins at its core. However, there
are more optimizations that you using DataFrames and Spark SQL joins. With
the schema, there is a greater scope for optimizing the joins. So
converting RDDs from streaming and the batch RDDs to data frames, and then
applying joins may improve performance.

TD

On Wed, Apr 15, 2015 at 1:50 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Thank you Sir, and one final confirmation/clarification -  are all forms
 of joins in the Spark API for DStream RDDs based on cogroup in terms of
 their internal implementation



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, April 15, 2015 9:48 PM

 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: RAM management during cogroup and join



 Agreed.



 On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 That has been done Sir and represents further optimizations – the
 objective here was to confirm whether cogroup always results in the
 previously described “greedy” explosion of the number of elements included
 and RAM allocated for the result RDD



 The optimizations mentioned still don’t change the total number of
 elements included in the result RDD and RAM allocated – right?



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, April 15, 2015 9:25 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: RAM management during cogroup and join



 Significant optimizations can be made by doing the joining/cogroup in a
 smart way. If you have to join streaming RDDs with the same batch RDD, then
 you can first partition the batch RDDs using a partitions and cache it, and
 then use the same partitioner on the streaming RDDs. That would make sure
 that the large batch RDDs is not partitioned repeatedly for the cogroup,
 only the small streaming RDDs are partitioned.



 HTH



 TD



 On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 There are indications that joins in Spark are implemented with / based on
 the
 cogroup function/primitive/transform. So let me focus first on cogroup - it
 returns a result which is RDD consisting of essentially ALL elements of the
 cogrouped RDDs. Said in another way - for every key in each of the
 cogrouped
 RDDs there is at least one element from at least one of the cogrouped RDDs.

 That would mean that when smaller, moreover streaming e.g.
 JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
 would result in RAM allocated for multiple instances of the result
 (cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
 Obviously the RAM will get returned when the DStream RDDs get discard and
 they do on a regular basis, but still that seems as unnecessary spike in
 the
 RAM consumption

 I have two questions:

 1.Is there anyway to control the cogroup process more precisely e.g. tell
 it to include I the cogrouped RDD only elements where there are at least
 one
 element from EACH of the cogrouped RDDs per given key. Based on the current
 cogroup API this is not possible


 2.If the cogroup is really such a sledgehammer and secondly the joins are
 based on cogroup then even though they can present a prettier picture in
 terms of the end result visible to the end user does that mean that under
 the hood there is still the same atrocious RAM consumption going on




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







RE: RAM management during cogroup and join

2015-04-15 Thread Evo Eftimov
Data Frames are available from the latest 1.3 release I believe – in 1.2 (our 
case at the moment) I guess the options are more limited 

 

PS: agree that DSTreams are just an abstraction for a sequence / streams of 
(ordinary) RDDs – when i use “DStreams” I mean the DStream OO API in Spark not 
that DStreams are some sort of different type of RDDs

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 11:11 PM
To: Evo Eftimov
Cc: user
Subject: Re: RAM management during cogroup and join

 

Well, DStream joins are nothing but RDD joins at its core. However, there are 
more optimizations that you using DataFrames and Spark SQL joins. With the 
schema, there is a greater scope for optimizing the joins. So converting RDDs 
from streaming and the batch RDDs to data frames, and then applying joins may 
improve performance.

 

TD

 

On Wed, Apr 15, 2015 at 1:50 PM, Evo Eftimov evo.efti...@isecc.com wrote:

Thank you Sir, and one final confirmation/clarification -  are all forms of 
joins in the Spark API for DStream RDDs based on cogroup in terms of their 
internal implementation 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 9:48 PM


To: Evo Eftimov
Cc: user
Subject: Re: RAM management during cogroup and join

 

Agreed. 

 

On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote:

That has been done Sir and represents further optimizations – the objective 
here was to confirm whether cogroup always results in the previously described 
“greedy” explosion of the number of elements included and RAM allocated for the 
result RDD 

 

The optimizations mentioned still don’t change the total number of elements 
included in the result RDD and RAM allocated – right? 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 9:25 PM
To: Evo Eftimov
Cc: user
Subject: Re: RAM management during cogroup and join

 

Significant optimizations can be made by doing the joining/cogroup in a smart 
way. If you have to join streaming RDDs with the same batch RDD, then you can 
first partition the batch RDDs using a partitions and cache it, and then use 
the same partitioner on the streaming RDDs. That would make sure that the large 
batch RDDs is not partitioned repeatedly for the cogroup, only the small 
streaming RDDs are partitioned.

 

HTH

 

TD

 

On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote:

There are indications that joins in Spark are implemented with / based on the
cogroup function/primitive/transform. So let me focus first on cogroup - it
returns a result which is RDD consisting of essentially ALL elements of the
cogrouped RDDs. Said in another way - for every key in each of the cogrouped
RDDs there is at least one element from at least one of the cogrouped RDDs.

That would mean that when smaller, moreover streaming e.g.
JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
would result in RAM allocated for multiple instances of the result
(cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
Obviously the RAM will get returned when the DStream RDDs get discard and
they do on a regular basis, but still that seems as unnecessary spike in the
RAM consumption

I have two questions:

1.Is there anyway to control the cogroup process more precisely e.g. tell
it to include I the cogrouped RDD only elements where there are at least one
element from EACH of the cogrouped RDDs per given key. Based on the current
cogroup API this is not possible


2.If the cogroup is really such a sledgehammer and secondly the joins are
based on cogroup then even though they can present a prettier picture in
terms of the end result visible to the end user does that mean that under
the hood there is still the same atrocious RAM consumption going on




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 

 



Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?

2015-04-15 Thread Tathagata Das
Can you clarify more on what you want to do after querying? Is the batch
not completed until the querying and subsequent processing has completed?


On Tue, Apr 14, 2015 at 10:36 PM, Krzysztof Zarzycki k.zarzy...@gmail.com
wrote:

 Thank you Tathagata, very helpful answer.

 Though, I would like to highlight that recent stream processing systems
 are trying to help users in implementing use case of holding such large
 (like 2 months of data) states. I would mention here Samza state
 management
 http://samza.apache.org/learn/documentation/0.9/container/state-management.html
  and
 Trident state management
 https://storm.apache.org/documentation/Trident-state. I'm waiting when
 Spark would help with that too, because generally I definitely prefer this
 technology:)

 But considering holding state in Cassandra with Spark Streaming, I
 understand we're not talking here about using Cassandra as input nor output
 (nor make use of spark-cassandra-connector
 https://github.com/datastax/spark-cassandra-connector). We're talking
 here about querying Cassandra from map/mapPartition functions.
 I have one question about it: Is it possible to query Cassandra
 asynchronously within Spark Streaming? And while doing it, is it possible
 to take next batch of rows, while the previous is waiting on Cassandra I/O?
 I think (but I'm not sure) this generally asks, whether several consecutive
 windows can interleave (because they are long to process)? Let's draw it:

 --|query Cassandra asynchronously---  window1
 --- window2

 While writing it, I start to believe they can, because windows are
 time-triggered, not triggered when previous window has finished... But it's
 better to ask:)




 2015-04-15 2:08 GMT+02:00 Tathagata Das t...@databricks.com:

 Fundamentally, stream processing systems are designed for processing
 streams of data, not for storing large volumes of data for a long period of
 time. So if you have to maintain that much state for months, then its best
 to use another system that is designed for long term storage (like
 Cassandra) which has proper support for making all that state
 fault-tolerant, high-performant, etc. So yes, the best option is to use
 Cassandra for the state and Spark Streaming jobs accessing the state from
 Cassandra. There are a number of optimizations that can be done. Its not
 too hard to build a simple on-demand populated cache (singleton hash map
 for example), that speeds up access from Cassandra, and all updates are
 written through the cache. This is a common use of Spark Streaming +
 Cassandra/HBase.

 Regarding the performance of updateStateByKey, we are aware of the
 limitations, and we will improve it soon :)

 TD


 On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki 
 k.zarzy...@gmail.com wrote:

 Hey guys, could you please help me with a question I asked on
 Stackoverflow:
 https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two
 ?  I'll be really grateful for your help!

 I'm also pasting the question below:

 I'm trying to solve a (simplified here) problem in Spark Streaming:
 Let's say I have a log of events made by users, where each event is a tuple
 (user name, activity, time), e.g.:

 (user1, view, 2015-04-14T21:04Z) (user1, click,
 2015-04-14T21:05Z)

 Now I would like to gather events by user to do some analysis of that.
 Let's say that output is some analysis of:

 (user1, List((view, 2015-04-14T21:04Z),(click,
 2015-04-14T21:05Z))

 The events should be kept for even *2 months*. During that time there
 might be around *500 milion*of such events, and *millions of unique* users,
 which are keys here.

 *My questions are:*

- Is it feasible to do such a thing with updateStateByKey on
DStream, when I have millions of keys stored?
- Am I right that DStream.window is no use here, when I have 2
months length window and would like to have a slide of few seconds?

 P.S. I found out, that updateStateByKey is called on all the keys on
 every slide, so that means it will be called millions of time every few
 seconds. That makes me doubt in this design and I'm rather thinking about
 alternative solutions like:

- using Cassandra for state
- using Trident state (with Cassandra probably)
- using Samza with its state management.






Passing Elastic Search Mappings in Spark Conf

2015-04-15 Thread Deepak Subhramanian
Hi,

Is there a way to pass the mapping to define a field as not analyzed
with es-spark settings.

I am just wondering if I can set the mapping type for a field as not
analyzed using the set function in spark conf as similar to the other
es settings.

val sconf = new SparkConf()
  .setMaster(local[1])
  .setAppName(Load Data To ES)
  .set(spark.ui.port, 4141)
  .set(es.index.auto.create, true)
  .set(es.net.http.auth.user, es_admin)
  .set(es.index.auto.create, true)
  .set(es.mapping.names, CREATED_DATE:@timestamp)


Thanks,
Deepak Subhramanian

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to do dispatching in Streaming?

2015-04-15 Thread Tathagata Das
It may be worthwhile to do architect the computation in a different way.

dstream.foreachRDD { rdd =
   rdd.foreach { record =
  // do different things for each record based on filters
   }
}

TD

On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I have a Kafka topic that contains dozens of different types of messages.
 And for each one I'll need to create a DStream for it.

 Currently I have to filter the Kafka stream over and over, which is very
 inefficient.

 So what's the best way to do dispatching in Spark Streaming? (one DStream
 - multiple DStreams)


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Dataset announcement

2015-04-15 Thread Olivier Chapelle
Dear Spark users,

I would like to draw your attention to a dataset that we recently released,
which is as of now the largest machine learning dataset ever released; see
the following blog announcements:
 - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/
 -
http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx

The characteristics of this dataset are:
 - 1 TB of data
 - binary classification
 - 13 integer features
 - 26 categorical features, some of them taking millions of values.
 - 4B rows

Hopefully this dataset will be useful to assess and push further the
scalability of Spark and MLlib.

Cheers,
Olivier



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Passing Elastic Search Mappings in Spark Conf

2015-04-15 Thread Nick Pentreath
If you want to specify mapping you must first create the mappings for your 
index types before indexing.




As far as I know there is no way to specify this via ES-hadoop. But it's best 
practice to explicitly create mappings prior to indexing, or to use index 
templates when dynamically creating indexes.



—
Sent from Mailbox

On Thu, Apr 16, 2015 at 1:14 AM, Deepak Subhramanian
deepak.subhraman...@gmail.com wrote:

 Hi,
 Is there a way to pass the mapping to define a field as not analyzed
 with es-spark settings.
 I am just wondering if I can set the mapping type for a field as not
 analyzed using the set function in spark conf as similar to the other
 es settings.
 val sconf = new SparkConf()
   .setMaster(local[1])
   .setAppName(Load Data To ES)
   .set(spark.ui.port, 4141)
   .set(es.index.auto.create, true)
   .set(es.net.http.auth.user, es_admin)
   .set(es.index.auto.create, true)
   .set(es.mapping.names, CREATED_DATE:@timestamp)
 Thanks,
 Deepak Subhramanian
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: Dataset announcement

2015-04-15 Thread Simon Edelhaus
Greetings!

How about medical data sets, and specifically longitudinal vital signs.

Can people send good pointers?

Thanks in advance,


-- ttfn
Simon Edelhaus
California 2015

On Wed, Apr 15, 2015 at 6:01 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Very neat, Olivier; thanks for sharing this.

 Matei

  On Apr 15, 2015, at 5:58 PM, Olivier Chapelle oliv...@chapelle.cc
 wrote:
 
  Dear Spark users,
 
  I would like to draw your attention to a dataset that we recently
 released,
  which is as of now the largest machine learning dataset ever released;
 see
  the following blog announcements:
  - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/
  -
 
 http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx
 
  The characteristics of this dataset are:
  - 1 TB of data
  - binary classification
  - 13 integer features
  - 26 categorical features, some of them taking millions of values.
  - 4B rows
 
  Hopefully this dataset will be useful to assess and push further the
  scalability of Spark and MLlib.
 
  Cheers,
  Olivier
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Actor not found

2015-04-15 Thread Canoe
13119 Exception in thread main akka.actor.ActorNotFound: Actor not found
for: ActorSelection[Anchor(akka.tcp://sparkdri...@dmslave13.et2.tbsi 
te.net:5908/), Path(/user/OutputCommitCoordinator)]
13120 at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
13121 at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
13122 at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
13123 at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
13124 at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
13125 at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
13126 at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
13127 at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
13128 at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
13129 at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
13130 at
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
13131 at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
13132 at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
13133 at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
13134 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
13135 at
akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
13136 at
akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
13137 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
13138 at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
13139 at
akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
13140 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
13141 at
akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
13142 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
13143 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
13144 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
13145 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
13146 at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
13147 at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
13148 at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
13149 at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
13150 at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I met the same problem when I run spark on yarn. Is this a bug or what ? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-tp22265p22508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataset announcement

2015-04-15 Thread Matei Zaharia
Very neat, Olivier; thanks for sharing this.

Matei

 On Apr 15, 2015, at 5:58 PM, Olivier Chapelle oliv...@chapelle.cc wrote:
 
 Dear Spark users,
 
 I would like to draw your attention to a dataset that we recently released,
 which is as of now the largest machine learning dataset ever released; see
 the following blog announcements:
 - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/
 -
 http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx
 
 The characteristics of this dataset are:
 - 1 TB of data
 - binary classification
 - 13 integer features
 - 26 categorical features, some of them taking millions of values.
 - 4B rows
 
 Hopefully this dataset will be useful to assess and push further the
 scalability of Spark and MLlib.
 
 Cheers,
 Olivier
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-15 Thread Nathan McCarthy
The problem lies with getting the driver classes into the primordial class 
loader when running on YARN.

Basically I need to somehow set the SPARK_CLASSPATH or compute_classpath.sh 
when running on YARN. I’m not sure how to do this when YARN is handling all the 
file copy.

From: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au
Date: Wednesday, 15 April 2015 11:49 pm
To: Wang, Daoyuan daoyuan.w...@intel.commailto:daoyuan.w...@intel.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Tried with 1.3.0 release (built myself)  the most recent 1.3.1 Snapshot off 
the 1.3 branch.

Haven't tried with 1.4/master.


From: Wang, Daoyuan [daoyuan.w...@intel.commailto:daoyuan.w...@intel.com]
Sent: Wednesday, April 15, 2015 5:22 PM
To: Nathan McCarthy; user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Can you provide your spark version?

Thanks,
Daoyuan

From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au]
Sent: Wednesday, April 15, 2015 1:57 PM
To: Nathan McCarthy; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au
Date: Wednesday, 15 April 2015 1:57 pm
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from 
a JDBC data source. All seems to work well locally (master = local[*]), however 
as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala sqlContext.load(jdbc, Map(url - 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - 
CUBE.DIM_SUPER_STORE_TBL”))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply “driver” - 
“net.sourceforge.jtds.jdbc.Driver” to .load we get;

scala sqlContext.load(jdbc, Map(url - 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - 
net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)

Yet if I run a Class.forName() just from the shell;

scala Class.forName(net.sourceforge.jtds.jdbc.Driver)
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I’ve tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala import java.sql.DriverManager
import java.sql.DriverManager
scala 
DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)
res3: java.sql.Connection = 
net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

I’m probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn’t specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ = 
java.sql.DriverManager.getDrivers().asScala.map(d = s”$d | 

Re: Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?

2015-04-15 Thread Canoe
now we have spark 1.3.0 on chd 5.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760p22509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataset announcement

2015-04-15 Thread Krishna Sankar
Thanks Olivier. Good work.
Interesting in more than one ways - including training, benchmarking,
testing new releases et al.
One quick question - do you plan to make it available as an S3 bucket ?

Cheers
k/

On Wed, Apr 15, 2015 at 5:58 PM, Olivier Chapelle oliv...@chapelle.cc
wrote:

 Dear Spark users,

 I would like to draw your attention to a dataset that we recently released,
 which is as of now the largest machine learning dataset ever released; see
 the following blog announcements:
  - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/
  -

 http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx

 The characteristics of this dataset are:
  - 1 TB of data
  - binary classification
  - 13 integer features
  - 26 categorical features, some of them taking millions of values.
  - 4B rows

 Hopefully this dataset will be useful to assess and push further the
 scalability of Spark and MLlib.

 Cheers,
 Olivier



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-15 Thread ๏̯͡๏
Can you provide the JDBC connector jar version. Possibly the full JAR name
and full command you ran Spark with ?

On Wed, Apr 15, 2015 at 11:27 AM, Nathan McCarthy 
nathan.mccar...@quantium.com.au wrote:

  Just an update, tried with the old JdbcRDD and that worked fine.

   From: Nathan nathan.mccar...@quantium.com.au
 Date: Wednesday, 15 April 2015 1:57 pm
 To: user@spark.apache.org user@spark.apache.org
 Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

   Hi guys,

  Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a
 DF from a JDBC data source. All seems to work well locally (master =
 local[*]), however as soon as we try and run on YARN we have problems.

  We seem to be running into problems with the class path and loading up
 the JDBC driver. I’m using the jTDS 1.3.1 driver,
 net.sourceforge.jtds.jdbc.Driver.

  ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

  When trying to run I get an exception;

  scala sqlContext.load(jdbc, Map(url -
 jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable -
 CUBE.DIM_SUPER_STORE_TBL”))

  java.sql.SQLException: No suitable driver found for
 jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

  Thinking maybe we need to force load the driver, if I supply *“driver”
 - “net.sourceforge.jtds.jdbc.Driver”* to .load we get;

  scala sqlContext.load(jdbc, Map(url -
 jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver -
 net.sourceforge.jtds.jdbc.Driver, dbtable -
 CUBE.DIM_SUPER_STORE_TBL”))

   java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:191)
 at
 org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
 at
 org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
 at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)

  Yet if I run a Class.forName() just from the shell;

  scala Class.forName(net.sourceforge.jtds.jdbc.Driver)
 res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

  No problem finding the JAR. I’ve tried in both the shell, and running
 with spark-submit (packing the driver in with my application as a fat JAR).
 Nothing seems to work.

  I can also get a connection in the driver/shell no problem;

  scala import java.sql.DriverManager
 import java.sql.DriverManager
  scala
 DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)
 res3: java.sql.Connection =
 net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

  I’m probably missing some class path setting here. In
 *jdbc.DefaultSource.createRelation* it looks like the call to
 *Class.forName* doesn’t specify a class loader so it just uses the
 default Java behaviour to reflectively get the class loader. It almost
 feels like its using a different class loader.

  I also tried seeing if the class path was there on all my executors by
 running;

  *import *scala.collection.JavaConverters._

 sc.parallelize(*Seq*(1,2,3,4)).flatMap(_ = java.sql.DriverManager.
 *getDrivers*().asScala.map(d = *s**”**$*d* | **$*{d.acceptsURL(
 *jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd*)}**
 )).collect().foreach(*println*)

   This successfully returns;

  15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect
 at Main.scala:46, took 1.495597 s
 org.apache.derby.jdbc.AutoloadedDriver40 | false
 com.mysql.jdbc.Driver | false
 net.sourceforge.jtds.jdbc.Driver | true
 org.apache.derby.jdbc.AutoloadedDriver40 | false
 com.mysql.jdbc.Driver | false
 net.sourceforge.jtds.jdbc.Driver | true
 org.apache.derby.jdbc.AutoloadedDriver40 | false
 com.mysql.jdbc.Driver | false
 net.sourceforge.jtds.jdbc.Driver | true
 org.apache.derby.jdbc.AutoloadedDriver40 | false
 com.mysql.jdbc.Driver | false
 net.sourceforge.jtds.jdbc.Driver | true

  As a final test we tried with postgres driver and had the same problem.
 Any ideas?

  Cheers,
  Nathan




-- 
Deepak


Parquet Partition Size are different when using Dataframe's save append funciton

2015-04-15 Thread 顾亮亮
Hi,

When I use Dataframe’s save append function, I find that the parquet partition 
size are very different.

Part-r-1 to 00021 are generated at the first time save append function is 
called.
Part-r-00022 to 00042 is generated at the second time save append function is 
called.

As you can see, the size of Part-r-1 to 00021 is 200M, while the size of 
Part-r-00022 to 00042 is 700M.
But the source data is the same, which confused me.

-rw-r--r-- 1 sysplatform sysplatform 2.0K Apr 8 10:01 _common_metadata
-rw-r--r-- 1 sysplatform sysplatform 392K Apr 8 10:01 _metadata
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-1.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:44 part-r-2.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-3.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-4.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-5.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-6.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-7.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-8.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-9.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00010.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00011.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00012.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00013.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00014.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00015.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00016.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00017.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00018.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00019.parquet
-rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00020.parquet
-rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00021.parquet
-rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:01 part-r-00022.parquet
-rw-r--r-- 1 sysplatform sysplatform 723M Apr 8 10:00 part-r-00023.parquet
-rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00024.parquet
-rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:00 part-r-00025.parquet
-rw-r--r-- 1 sysplatform sysplatform 717M Apr 8 10:00 part-r-00026.parquet
-rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:00 part-r-00027.parquet
-rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:01 part-r-00028.parquet
-rw-r--r-- 1 sysplatform sysplatform 725M Apr 8 10:01 part-r-00029.parquet
-rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:00 part-r-00030.parquet
-rw-r--r-- 1 sysplatform sysplatform 725M Apr 8 10:01 part-r-00031.parquet
-rw-r--r-- 1 sysplatform sysplatform 724M Apr 8 10:01 part-r-00032.parquet
-rw-r--r-- 1 sysplatform sysplatform 724M Apr 8 10:00 part-r-00033.parquet
-rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00034.parquet
-rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00035.parquet
-rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:00 part-r-00036.parquet
-rw-r--r-- 1 sysplatform sysplatform 717M Apr 8 10:00 part-r-00037.parquet
-rw-r--r-- 1 sysplatform sysplatform 724M Apr 8 10:01 part-r-00038.parquet
-rw-r--r-- 1 sysplatform sysplatform 722M Apr 8 10:01 part-r-00039.parquet
-rw-r--r-- 1 sysplatform sysplatform 722M Apr 8 10:00 part-r-00040.parquet
-rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00041.parquet
-rw-r--r-- 1 sysplatform sysplatform 723M Apr 8 10:01 part-r-00042.parquet
-rw-r--r-- 1 sysplatform sysplatform 0 Apr 8 10:01 _SUCCESS


Re: Saving RDDs as custom output format

2015-04-15 Thread Akhil Das
You can try using ORCOutputFormat with yourRDD.saveAsNewAPIHadoopFile

Thanks
Best Regards

On Tue, Apr 14, 2015 at 9:29 PM, Daniel Haviv 
daniel.ha...@veracity-group.com wrote:

 Hi,
 Is it possible to store RDDs as custom output formats, For example ORC?

 Thanks,
 Daniel



Re: spark streaming printing no output

2015-04-15 Thread Shushant Arora
Yes only Time: 142905487 ms  strings gets printed on console.
No output is getting printed.
And timeinterval between two strings of form ( time:ms)is very less
than Streaming Duration set in program.

On Wed, Apr 15, 2015 at 5:11 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you see something like this in the console?

 ---
 Time: 142905487 ms
 ---


 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.







Re: spark streaming printing no output

2015-04-15 Thread Akhil Das
Just make sure you have atleast 2 cores available for processing. You can
try launching it in local[2] and make sure its working fine.

Thanks
Best Regards

On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.






RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

2015-04-15 Thread Wang, Daoyuan
Can you provide your spark version?

Thanks,
Daoyuan

From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au]
Sent: Wednesday, April 15, 2015 1:57 PM
To: Nathan McCarthy; user@spark.apache.org
Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Just an update, tried with the old JdbcRDD and that worked fine.

From: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au
Date: Wednesday, 15 April 2015 1:57 pm
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0

Hi guys,

Trying to use a Spark SQL context's .load(jdbc, ...) method to create a DF 
from a JDBC data source. All seems to work well locally (master = local[*]), 
however as soon as we try and run on YARN we have problems.

We seem to be running into problems with the class path and loading up the JDBC 
driver. I'm using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver.

./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client

When trying to run I get an exception;

scala sqlContext.load(jdbc, Map(url - 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - 
CUBE.DIM_SUPER_STORE_TBL))

java.sql.SQLException: No suitable driver found for 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd

Thinking maybe we need to force load the driver, if I supply driver - 
net.sourceforge.jtds.jdbc.Driver to .load we get;

scala sqlContext.load(jdbc, Map(url - 
jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - 
net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL))

java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)

Yet if I run a Class.forName() just from the shell;

scala Class.forName(net.sourceforge.jtds.jdbc.Driver)
res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver

No problem finding the JAR. I've tried in both the shell, and running with 
spark-submit (packing the driver in with my application as a fat JAR). Nothing 
seems to work.

I can also get a connection in the driver/shell no problem;

scala import java.sql.DriverManager
import java.sql.DriverManager
scala 
DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)
res3: java.sql.Connection = 
net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0

I'm probably missing some class path setting here. In 
jdbc.DefaultSource.createRelation it looks like the call to Class.forName 
doesn't specify a class loader so it just uses the default Java behaviour to 
reflectively get the class loader. It almost feels like its using a different 
class loader.

I also tried seeing if the class path was there on all my executors by running;

import scala.collection.JavaConverters._
sc.parallelize(Seq(1,2,3,4)).flatMap(_ = 
java.sql.DriverManager.getDrivers().asScala.map(d = s$d | 
${d.acceptsURL(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)})).collect().foreach(println)

This successfully returns;

15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at 
Main.scala:46, took 1.495597 s
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true
org.apache.derby.jdbc.AutoloadedDriver40 | false
com.mysql.jdbc.Driver | false
net.sourceforge.jtds.jdbc.Driver | true

As a final test we tried with postgres driver and had the same problem. Any 
ideas?

Cheers,
Nathan


Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Xianjin YE
what is your JVM heap size settings?  The OOM in SIzeEstimator is caused by a 
lot of entry in IdentifyHashMap. 
A quick guess is that the object in your dataset is a custom class and you 
didn't implement the hashCode and equals method correctly. 



On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:

 I am aggregating a dataset using combineByKey method and for a certain input 
 size, the job fails with the following error. I have enabled head dumps to 
 better analyze the issue and will report back if I have any findings. 
 Meanwhile, if you guys have any idea of what could possibly result in this 
 error or how to better debug this, please let me know.
 
 java.lang.OutOfMemoryError: Java heap space
 at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
 at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
 at 
 org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
 at 
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
 at 
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at 
 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
 at 
 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
 at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
 at 
 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
 at 
 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
 at 
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
 at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
 at 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
 at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming printing no output

2015-04-15 Thread Shixiong Zhu
So the time niterval is much less than 1000 ms as you set in the code?
That's weird. Could you check the whole outputs to confirm that the content
won't be flushed by logs?

Best Regards,
Shixiong(Ryan) Zhu

2015-04-15 15:04 GMT+08:00 Shushant Arora shushantaror...@gmail.com:

 Yes only Time: 142905487 ms  strings gets printed on console.
 No output is getting printed.
 And timeinterval between two strings of form ( time:ms)is very less
 than Streaming Duration set in program.

 On Wed, Apr 15, 2015 at 5:11 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you see something like this in the console?

 ---
 Time: 142905487 ms
 ---


 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.








Re: Running Spark on Gateway - Connecting to Resource Manager Retries

2015-04-15 Thread Akhil Das
Make sure your yarn service is running on 8032.

Thanks
Best Regards

On Tue, Apr 14, 2015 at 12:35 PM, Vineet Mishra clearmido...@gmail.com
wrote:

 Hi Team,

 I am running Spark Word Count example(
 https://github.com/sryza/simplesparkapp), if I go with master as local it
 works fine.

 But when I change the master to yarn its end with retries connecting to
 resource manager(stack trace mentioned below),

 15/04/14 11:31:57 INFO RMProxy: Connecting to ResourceManager at /
 0.0.0.0:8032
 15/04/14 11:31:58 INFO Client: Retrying connect to server:
 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
 MILLISECONDS)
 15/04/14 11:31:59 INFO Client: Retrying connect to server:
 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
 MILLISECONDS)

 If I run the same command from Namenode instance it ends with
 ArrayOutofBoundException(Stack trace mentioned below),

 15/04/14 11:38:44 INFO YarnClientSchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.8
 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 1
 at
 com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:28)
 at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 Looking forward to get it resolve to work on respective nodes.

 Thanks,



OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am aggregating a dataset using combineByKey method and for a certain
input size, the job fails with the following error. I have enabled head
dumps to better analyze the issue and will report back if I have any
findings. Meanwhile, if you guys have any idea of what could possibly
result in this error or how to better debug this, please let me know.

java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
at
org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)


Do multiple ipython notebooks work on yarn in one cluster?

2015-04-15 Thread aihe
My colleagues and I work on spark recently. We just setup a new cluster on
yarn over which we can run spark. We basically use ipython and write program
in the notebook in a specific port(like ) via http.
We have our own notebooks and the odd thing is that if I run my notebook
first, my colleagues' notebooks will get stuck and stop there when there is
an action; and if one of them runs first, mine will get stuck.
Does one run of pyspark only support one notebook at one time? Or we should
figure out some configurations to make it work?

@Nam has the same problem with me. 
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-accesses-to-a-Spark-cluster-via-iPython-Notebook-td12162.html
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-accesses-to-a-Spark-cluster-via-iPython-Notebook-td12162.html
  

Thanks
Ai



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-multiple-ipython-notebooks-work-on-yarn-in-one-cluster-tp22498.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running Spark on Gateway - Connecting to Resource Manager Retries

2015-04-15 Thread Vineet Mishra
Hi Akhil,

Its running fine when running through Namenode(RM) but fails while running
through Gateway, if I add hadoop-core jars to the hadoop
directory(/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hadoop/) it
works fine.

Its really strange that I am running the job through Spark-Submit and
running via NameNode works fine and it fails when running through gateway
even when both are having same classpath.

Anyone tries running Spark from Gateway?

Looking for the quick revert!

Thanks,


On Wed, Apr 15, 2015 at 12:07 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Make sure your yarn service is running on 8032.

 Thanks
 Best Regards

 On Tue, Apr 14, 2015 at 12:35 PM, Vineet Mishra clearmido...@gmail.com
 wrote:

 Hi Team,

 I am running Spark Word Count example(
 https://github.com/sryza/simplesparkapp), if I go with master as local
 it works fine.

 But when I change the master to yarn its end with retries connecting to
 resource manager(stack trace mentioned below),

 15/04/14 11:31:57 INFO RMProxy: Connecting to ResourceManager at /
 0.0.0.0:8032
 15/04/14 11:31:58 INFO Client: Retrying connect to server:
 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
 MILLISECONDS)
 15/04/14 11:31:59 INFO Client: Retrying connect to server:
 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
 MILLISECONDS)

 If I run the same command from Namenode instance it ends with
 ArrayOutofBoundException(Stack trace mentioned below),

 15/04/14 11:38:44 INFO YarnClientSchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.8
 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 1
 at
 com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:28)
 at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 Looking forward to get it resolve to work on respective nodes.

 Thanks,





Re: spark streaming printing no output

2015-04-15 Thread Shushant Arora
When I launched spark-shell using, spark-shell ---master local[2].
Same behaviour, no output on console but only timestamps.

When I did, lines.saveAsTextFiles(hdfslocation,suffix);
I get empty files of 0 bytes on hdfs

On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Just make sure you have atleast 2 cores available for processing. You can
 try launching it in local[2] and make sure its working fine.

 Thanks
 Best Regards

 On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.







Re: Re: spark streaming printing no output

2015-04-15 Thread bit1...@163.com
Looks  the message is consumed by the another console?( can see messages typed 
on this port from another console.)



bit1...@163.com
 
From: Shushant Arora
Date: 2015-04-15 17:11
To: Akhil Das
CC: user@spark.apache.org
Subject: Re: spark streaming printing no output
When I launched spark-shell using, spark-shell ---master local[2].
Same behaviour, no output on console but only timestamps.

When I did, lines.saveAsTextFiles(hdfslocation,suffix);
I get empty files of 0 bytes on hdfs 

On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
Just make sure you have atleast 2 cores available for processing. You can try 
launching it in local[2] and make sure its working fine.

Thanks
Best Regards

On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com 
wrote:
Hi 

I am running a spark streaming application but on console nothing is getting 
printed.

I am doing
1.bin/spark-shell --master clusterMgrUrl
2.import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext( sc, Seconds(1))
val lines = ssc.socketTextStream(hostname,)
lines.print()
ssc.start()
ssc.awaitTermination()

Jobs are getting created when I see webUI but nothing gets printed on console.

I have started a nc script on hostname  port  and can see messages typed on 
this port from another console.



Please let me know If I am doing something wrong.







Re: Re: spark streaming printing no output

2015-04-15 Thread Shushant Arora
Its printing on console but on HDFS all folders are still empty .

On Wed, Apr 15, 2015 at 2:29 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Thanks !! Yes message types on this console is seen on another console.

 When I closed another console, spark streaming job is printing messages on
 console .

  Isn't the message written on a port using netcat be avaible for multiple
 consumers?

 On Wed, Apr 15, 2015 at 2:22 PM, bit1...@163.com bit1...@163.com wrote:

 Looks  the message is consumed by the another console?( can see messages
 typed on this port from another console.)

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-04-15 17:11
 *To:* Akhil Das ak...@sigmoidanalytics.com
 *CC:* user@spark.apache.org
 *Subject:* Re: spark streaming printing no output
 When I launched spark-shell using, spark-shell ---master local[2].
 Same behaviour, no output on console but only timestamps.

 When I did, lines.saveAsTextFiles(hdfslocation,suffix);
 I get empty files of 0 bytes on hdfs

 On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Just make sure you have atleast 2 cores available for processing. You
 can try launching it in local[2] and make sure its working fine.

 Thanks
 Best Regards

 On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.









RE: Running beyond physical memory limits

2015-04-15 Thread Brahma Reddy Battula
Thanks lot for your reply..

  There is no issue with spark1.1..Following issue came when I upgrade to 
spark2.0...Hence I did not decrease spark.executor.memory...
I mean to say, used same config for spark1.1 and spark1.2..

Is there any issue with spark1.2..?
Or Yarn will lead this..?
And why executor will not release memory, if there are tasks running..?



Thanks  Regards

Brahma Reddy Battula


From: Akhil Das [ak...@sigmoidanalytics.com]
Sent: Wednesday, April 15, 2015 2:35 PM
To: Brahma Reddy Battula
Cc: user@spark.apache.org
Subject: Re: Running beyond physical memory limits

Did you try reducing your spark.executor.memory?

Thanks
Best Regards

On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula 
brahmareddy.batt...@huawei.commailto:brahmareddy.batt...@huawei.com wrote:
Hello Sparkers


I am newbie to spark and  need help.. We are using spark 1.2, we are getting 
the following error and executor is getting killed..I seen SPARK-1930 and it 
should be in 1.2..

Any pointer to following error, like what might lead this error..


2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container 
[pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running 
beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory 
used; 26.7 GB of 260 GB virtual memory used. Killing container.
Dump of the process-tree for container_1429065217137_0012_01_-411041790 :   
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 
/opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill 
%p -Xms24576m -Xmx24576m 
-Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
-Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
-Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 -Dspark.driver.port=23204 -Dspark.random.port.max=23999 
-Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 
-Dspark.ui.port=23000 -Dspark.random.port.min=23000 
-Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedSchedulerhttp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler
 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 
126843 (bash) 0 0 11603968 331 /bin/bash -c 
/opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill 
%p' -Xms24576m -Xmx24576m  
-Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
-Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
-Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' 
'-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' 
'-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' 
'-Dspark.random.port.min=23000' 
-Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedSchedulerhttp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler
 3 hadoopc1h11 10 application_1429065217137_0012 1 
/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout
 2 
/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr
 | 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447)



And some doubts


1) why executor will not release memory, if there are tasks running..?


2) is there issue from hadoop which will lead this error..?



Any help , will be appreciated...




Thanks  Regards

Brahma Reddy Battula






Re: Running beyond physical memory limits

2015-04-15 Thread Sean Owen
This is not related to executor memory, but the extra overhead
subtracted from the executor's size in order to avoid using more than
the physical memory that YARN allows. That is, if you declare a 32G
executor YARN lets you use 32G physical memory but your JVM heap must
be significantly less than 32G max. This is the overhead factor that
is subtracted for you, and it seems to need to be bigger in your case.

On Wed, Apr 15, 2015 at 10:16 AM, Brahma Reddy Battula
brahmareddy.batt...@huawei.com wrote:
 Thanks lot for your reply..

   There is no issue with spark1.1..Following issue came when I upgrade to
 spark2.0...Hence I did not decrease spark.executor.memory...
 I mean to say, used same config for spark1.1 and spark1.2..

 Is there any issue with spark1.2..?
 Or Yarn will lead this..?
 And why executor will not release memory, if there are tasks running..?


 Thanks  Regards

 Brahma Reddy Battula


 
 From: Akhil Das [ak...@sigmoidanalytics.com]
 Sent: Wednesday, April 15, 2015 2:35 PM
 To: Brahma Reddy Battula
 Cc: user@spark.apache.org
 Subject: Re: Running beyond physical memory limits

 Did you try reducing your spark.executor.memory?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula
 brahmareddy.batt...@huawei.com wrote:

 Hello Sparkers


 I am newbie to spark and  need help.. We are using spark 1.2, we are
 getting the following error and executor is getting killed..I seen
 SPARK-1930 and it should be in 1.2..

 Any pointer to following error, like what might lead this error..


 2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container
 [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is
 running beyond physical memory limits. Current usage: 26.0 GB of 26 GB
 physical memory used; 26.7 GB of 260 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1429065217137_0012_01_-411041790 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352
 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
 -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 -Dspark.driver.port=23204 -Dspark.random.port.max=23999
 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100
 -Dspark.ui.port=23000 -Dspark.random.port.min=23000
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3
 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843
 126843 (bash) 0 0 11603968 331 /bin/bash -c
 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
 -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999'
 '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10'
 '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000'
 '-Dspark.random.port.min=23000'
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3
 hadoopc1h11 10 application_1429065217137_0012 1
 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout
 2
 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr
  |
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447)



 And some doubts


 1) why executor will not release memory, if there are tasks running..?



 2) is there issue from hadoop which will lead this error..?



 Any help , will be appreciated...




 Thanks  Regards

 Brahma Reddy Battula






-
To 

Running beyond physical memory limits

2015-04-15 Thread Brahma Reddy Battula
Hello Sparkers


I am newbie to spark and  need help.. We are using spark 1.2, we are getting 
the following error and executor is getting killed..I seen SPARK-1930 and it 
should be in 1.2..

Any pointer to following error, like what might lead this error..


2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container 
[pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running 
beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory 
used; 26.7 GB of 260 GB virtual memory used. Killing container.
Dump of the process-tree for container_1429065217137_0012_01_-411041790 :   
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 
/opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill 
%p -Xms24576m -Xmx24576m 
-Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
-Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
-Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 -Dspark.driver.port=23204 -Dspark.random.port.max=23999 
-Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 
-Dspark.ui.port=23000 -Dspark.random.port.min=23000 
-Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 
hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 
126843 (bash) 0 0 11603968 331 /bin/bash -c 
/opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill 
%p' -Xms24576m -Xmx24576m  
-Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 
-Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 
-Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' 
'-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' 
'-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' 
'-Dspark.random.port.min=23000' 
-Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 
hadoopc1h11 10 application_1429065217137_0012 1 
/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout
 2 
/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr
 | 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447)



And some doubts


1) why executor will not release memory, if there are tasks running..?


2) is there issue from hadoop which will lead this error..?



Any help , will be appreciated...




Thanks  Regards

Brahma Reddy Battula





Re: Re: spark streaming printing no output

2015-04-15 Thread Shushant Arora
Thanks !! Yes message types on this console is seen on another console.

When I closed another console, spark streaming job is printing messages on
console .

 Isn't the message written on a port using netcat be avaible for multiple
consumers?

On Wed, Apr 15, 2015 at 2:22 PM, bit1...@163.com bit1...@163.com wrote:

 Looks  the message is consumed by the another console?( can see messages
 typed on this port from another console.)

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-04-15 17:11
 *To:* Akhil Das ak...@sigmoidanalytics.com
 *CC:* user@spark.apache.org
 *Subject:* Re: spark streaming printing no output
 When I launched spark-shell using, spark-shell ---master local[2].
 Same behaviour, no output on console but only timestamps.

 When I did, lines.saveAsTextFiles(hdfslocation,suffix);
 I get empty files of 0 bytes on hdfs

 On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Just make sure you have atleast 2 cores available for processing. You can
 try launching it in local[2] and make sure its working fine.

 Thanks
 Best Regards

 On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.








Re: Running beyond physical memory limits

2015-04-15 Thread Akhil Das
Did you try reducing your spark.executor.memory?

Thanks
Best Regards

On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula 
brahmareddy.batt...@huawei.com wrote:

  Hello Sparkers


 I am newbie to spark and  need help.. We are using spark 1.2, we are
 getting the following error and executor is getting killed..I seen
 SPARK-1930 and it should be in 1.2..

 *Any pointer to following error, like what might lead this error.*.


 2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container
 [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is
 running beyond physical memory limits. Current usage: 26.0 GB of 26 GB
 physical memory used; 26.7 GB of 260 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1429065217137_0012_01_-411041790
 :|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352
 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
 -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 -Dspark.driver.port=23204 -Dspark.random.port.max=23999
 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100
 -Dspark.ui.port=23000 -Dspark.random.port.min=23000
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://
 sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11
 10 application_1429065217137_0012 |- 126843 76960 126843 126843
 (bash) 0 0 11603968 331 /bin/bash -c
 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
 -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999'
 '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10'
 '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000'
 '-Dspark.random.port.min=23000'
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://
 sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11
 10 application_1429065217137_0012 1
 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout
 2
 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr

  |
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447)




  And some doubts


  1) why executor will not release memory, if there are tasks running..?



 2) is there issue from hadoop which will lead this error..?



  Any help , will be appreciated...




  Thanks  Regards

 Brahma Reddy Battula







Re: Running beyond physical memory limits

2015-04-15 Thread Sean Owen
All this means is that your JVM is using more memory than it requested
from YARN. You need to increase the YARN memory overhead setting,
perhaps.

On Wed, Apr 15, 2015 at 9:59 AM, Brahma Reddy Battula
brahmareddy.batt...@huawei.com wrote:
 Hello Sparkers


 I am newbie to spark and  need help.. We are using spark 1.2, we are getting
 the following error and executor is getting killed..I seen SPARK-1930 and it
 should be in 1.2..

 Any pointer to following error, like what might lead this error..


 2015-04-15 11:55:39,697 | WARN  | Container Monitor | Container
 [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is
 running beyond physical memory limits. Current usage: 26.0 GB of 26 GB
 physical memory used; 26.7 GB of 260 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1429065217137_0012_01_-411041790 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352
 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
 -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 -Dspark.driver.port=23204 -Dspark.random.port.max=23999
 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100
 -Dspark.ui.port=23000 -Dspark.random.port.min=23000
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3
 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843
 126843 (bash) 0 0 11603968 331 /bin/bash -c
 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server
 -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m
 -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties
 -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native
 -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp
 '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999'
 '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10'
 '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000'
 '-Dspark.random.port.min=23000'
 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3
 hadoopc1h11 10 application_1429065217137_0012 1
 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout
 2
 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr
  |
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447)



 And some doubts


 1) why executor will not release memory, if there are tasks running..?



 2) is there issue from hadoop which will lead this error..?



 Any help , will be appreciated...




 Thanks  Regards

 Brahma Reddy Battula





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming with kafka

2015-04-15 Thread Akhil Das
Once you start your streaming application to read from Kafka, it will
launch receivers on the executor nodes. And you can see them on the
streaming tab of your driver ui (runs on 4040).

[image: Inline image 1]

These receivers will be fixed till the end of your pipeline (unless its
crashed etc.) You can say, eah receiver will run on a single core.

Thanks
Best Regards

On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 I want to understand the flow of spark streaming with kafka.

 In spark Streaming is the executor nodes at each run of streaming interval
 same or At each stream interval cluster manager assigns new executor nodes
 for processing this batch input. If yes then at each batch interval new
 executors register themselves as kafka consumers?

 Even without kafka is executor nodes on each batch interval same or driver
 nodes gets new executor nodes from cluster manager ?

 Thanks
 Shushant



Re: Opening many Parquet files = slow

2015-04-15 Thread Masf
Hi guys

Regarding to parquet files. I have Spark 1.2.0 and reading 27 parquet files
(250MB/file), it lasts 4 minutes.

I have a cluster with 4 nodes and it seems me too slow.

The load function is not available in Spark 1.2, so I can't test it


Regards.
Miguel.

On Mon, Apr 13, 2015 at 8:12 PM, Eric Eijkelenboom 
eric.eijkelenb...@gmail.com wrote:

 Hi guys

 Does anyone know how to stop Spark from opening all Parquet files before
 starting a job? This is quite a show stopper for me, since I have 5000
 Parquet files on S3.

 Recap of what I tried:

 1. Disable schema merging with: sqlContext.load(“parquet,
 Map(mergeSchema - false”, path - “s3://path/to/folder))
 This opens most files in the folder (17 out of 21 in my small
 example). For 5000 files on S3, sqlContext.load() takes 30 minutes to
 complete.

 2. Use the old api with:
 sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”)
 Now sqlContext.parquetFile() only opens a few files and prints the
 schema: so far so good! However, as soon as I run e.g. a count() on the
 dataframe, Spark still opens all files _before_ starting a job/stage.
 Effectively this moves the delay from load() to count() (or any other
 action I presume).

 3. Run Spark 1.3.1-rc2.
 sqlContext.load() took about 30 minutes for 5000 Parquet files on S3,
 the same as 1.3.0.

 Any help would be greatly appreciated!

 Thanks a lot.

 Eric




 On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com
 wrote:

 Hi Ted

 Ah, I guess the term ‘source’ confused me :)

 Doing:

 sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path
 to a single day of logs))

 for 1 directory with 21 files, Spark opens 17 files:

 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening '
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72'
 for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for
 reading at position '261573524'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening '
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74'
 for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening '
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77'
 for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening '
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62'
 for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for
 reading at position '259256807'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for
 reading at position '260002042'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for
 reading at position ‘260875275'
 etc.

 I can’t seem to pass a comma-separated list of directories to load(), so
 in order to load multiple days of logs, I have to point to the root folder
 and depend on auto-partition discovery (unless there’s a smarter way).

 Doing:

 sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path
 to root log dir))

 starts opening what seems like all files (I killed the process after a
 couple of minutes).

 Thanks for helping out.
 Eric





-- 


Saludos.
Miguel Ángel


Re: spark streaming with kafka

2015-04-15 Thread Shushant Arora
So receivers will be fixed for every run of streaming interval job. Say I
have set stream Duration to be 10 minutes, then after each 10 minute job
will be created and same executor nodes say in your
case(spark-akhil-slave2.c.neat-axis-616.internal
and spark-akhil-slave1.c.neat-axis-616.internal) will be used ?

Is it with all streaming applications that executor nodes will be fixed and
won't change depending on load of current batch or is it for kafka case
only??









On Wed, Apr 15, 2015 at 4:12 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Once you start your streaming application to read from Kafka, it will
 launch receivers on the executor nodes. And you can see them on the
 streaming tab of your driver ui (runs on 4040).

 [image: Inline image 1]

 These receivers will be fixed till the end of your pipeline (unless its
 crashed etc.) You can say, eah receiver will run on a single core.

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi

 I want to understand the flow of spark streaming with kafka.

 In spark Streaming is the executor nodes at each run of streaming
 interval same or At each stream interval cluster manager assigns new
 executor nodes for processing this batch input. If yes then at each batch
 interval new executors register themselves as kafka consumers?

 Even without kafka is executor nodes on each batch interval same or
 driver nodes gets new executor nodes from cluster manager ?

 Thanks
 Shushant





Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am setting spark.executor.memory as 1024m on a 3 node cluster with each
node having 4 cores and 7 GB RAM. The combiner functions are taking scala
case classes as input and are generating mutable.ListBuffer of scala case
classes. Therefore, I am guessing hashCode and equals should be taken care
of.

Thanks,
Aniket

On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE advance...@gmail.com wrote:

 what is your JVM heap size settings?  The OOM in SIzeEstimator is caused
 by a lot of entry in IdentifyHashMap.
 A quick guess is that the object in your dataset is a custom class and you
 didn't implement the hashCode and equals method correctly.



 On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:

  I am aggregating a dataset using combineByKey method and for a certain
 input size, the job fails with the following error. I have enabled head
 dumps to better analyze the issue and will report back if I have any
 findings. Meanwhile, if you guys have any idea of what could possibly
 result in this error or how to better debug this, please let me know.
 
  java.lang.OutOfMemoryError: Java heap space
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
  at
 org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
  at
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
  at
 org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at
 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
  at
 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
  at
 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at
 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
  at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
  at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
  at
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)






spark streaming with kafka

2015-04-15 Thread Shushant Arora
Hi

I want to understand the flow of spark streaming with kafka.

In spark Streaming is the executor nodes at each run of streaming interval
same or At each stream interval cluster manager assigns new executor nodes
for processing this batch input. If yes then at each batch interval new
executors register themselves as kafka consumers?

Even without kafka is executor nodes on each batch interval same or driver
nodes gets new executor nodes from cluster manager ?

Thanks
Shushant


Re: Re: spark streaming with kafka

2015-04-15 Thread Akhil Das
@Shushant: In my case, the receivers will be fixed till the end of the
application. This one's for Kafka case only, if you have a filestream
application, you will not have any receivers. Also, for kafka, next time
you  run the application, it's not fixed that the receivers will get
launched on the same machines.

@bit1129, If the receiver is crashed, it will try to restart the receiver,
since i have only 2 machines on that cluster, it will either restart on the
same node or on the other node.

Thanks
Best Regards

On Wed, Apr 15, 2015 at 4:30 PM, bit1...@163.com bit1...@163.com wrote:

 Hi, Akhil,

 I would ask a question here:  Assume Receiver-0 is crashed, will it be
 restarted on other worker nodes(In your picture, there would be 2 receivers
 on the same node) or will it start on the same node?

 --
 bit1...@163.com


 *From:* Akhil Das ak...@sigmoidanalytics.com
 *Date:* 2015-04-15 19:12
 *To:* Shushant Arora shushantaror...@gmail.com
 *CC:* user user@spark.apache.org
 *Subject:* Re: spark streaming with kafka
 Once you start your streaming application to read from Kafka, it will
 launch receivers on the executor nodes. And you can see them on the
 streaming tab of your driver ui (runs on 4040).

 [image: Inline image 1]

 These receivers will be fixed till the end of your pipeline (unless its
 crashed etc.) You can say, eah receiver will run on a single core.

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi

 I want to understand the flow of spark streaming with kafka.

 In spark Streaming is the executor nodes at each run of streaming
 interval same or At each stream interval cluster manager assigns new
 executor nodes for processing this batch input. If yes then at each batch
 interval new executors register themselves as kafka consumers?

 Even without kafka is executor nodes on each batch interval same or
 driver nodes gets new executor nodes from cluster manager ?

 Thanks
 Shushant


  [image: 提示图标] 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
 共有 *1* 个附件
  image.png(25K) 极速下载
 http://preview.mail.163.com/xdownload?filename=image.pngmid=1tbiShXcgVO-s0wyewAAsYpart=3sign=de80f80932aeee7d4eea5a9e5f4b755etime=1429095438uid=bit1129%40163.com
 在线预览
 http://preview.mail.163.com/preview?mid=1tbiShXcgVO-s0wyewAAsYpart=3sign=de80f80932aeee7d4eea5a9e5f4b755etime=1429095438uid=bit1129%40163.com