Save org.apache.spark.mllib.linalg.Matri to a file
Hello! The result of correlation in Spark MLLib is a of type org.apache.spark.mllib.linalg.Matrix. (see http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations) val data: RDD[Vector] = ... val correlMatrix: Matrix = Statistics.corr(data, pearson) I would like to save the result into a file. How can I do this? Thanks, Florin
Execption while using kryo with broadcast
Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Execption while using kryo with broadcast
Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Execption while using kryo with broadcast
Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Execption while using kryo with broadcast
This looks like known issue? check this out http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html Can you please suggest any work around I am broad casting HashMap return from RDD.collectasMap(). On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote: this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com wrote: Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Execption while using kryo with broadcast
this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com wrote: Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Execption while using kryo with broadcast
oh interesting. The suggested workaround is to wrap the result from collectAsMap into another hashmap, you should try that: MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); MapString, String tmp = new HashMapString, String(matchData); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(tmp); Can you please clarify: * Does it work w/ java serialization in the end? Or is this kryo only? * which Spark version you are using? (one of the relevant bugs was fixed in 1.2.1 and 1.3.0) On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele gangele...@gmail.com wrote: This looks like known issue? check this out http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html Can you please suggest any work around I am broad casting HashMap return from RDD.collectasMap(). On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote: this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com wrote: Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
Re: Microsoft SQL jdbc support from spark sql
I have found that it works if you place the sqljdbc41.jar directly in the following folder: YOUR_SPARK_HOME/core/target/jars/ So Spark will have the SQL Server jdbc driver when it computes its classpath. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22502.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Execption while using kryo with broadcast
its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped 15/04/15 12:58:51 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
How to get a clean DataFrame schema merge
Hi all, If you follow the example of schema merging in the spark documentation http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging you obtain the following results when you want to load the result data : single triple double 1 3 null 2 6 null 4 12 null 3 9 null 5 15 null 1 null 2 2 null 4 4 null 8 3 null 6 5 null 10 How to remove these null value and get something more logical like : single triple double 1 3 2 2 6 4 4 12 8 3 9 6 5 15 10 Bests, Jao
RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
Tried with 1.3.0 release (built myself) the most recent 1.3.1 Snapshot off the 1.3 branch. Haven't tried with 1.4/master. From: Wang, Daoyuan [daoyuan.w...@intel.com] Sent: Wednesday, April 15, 2015 5:22 PM To: Nathan McCarthy; user@spark.apache.org Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Can you provide your spark version? Thanks, Daoyuan From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au] Sent: Wednesday, April 15, 2015 1:57 PM To: Nathan McCarthy; user@spark.apache.org Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Just an update, tried with the old JdbcRDD and that worked fine. From: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 1:57 pm To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Hi guys, Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply “driver” - “net.sourceforge.jtds.jdbc.Driver” to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I’ve tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I’m probably missing some class path setting here. In jdbc.DefaultSource.createRelation it looks like the call to Class.forName doesn’t specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; import scala.collection.JavaConverters._ sc.parallelize(Seq(1,2,3,4)).flatMap(_ = java.sql.DriverManager.getDrivers().asScala.map(d = s”$d | ${d.acceptsURL(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)})).collect().foreach(println) This successfully returns; 15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at Main.scala:46, took 1.495597 s org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true As a final test we tried with postgres driver
Re: Execption while using kryo with broadcast
This worked with java serialization.I am using 1.2.0 you are right if I use 1.2.1 or 1.3.0 this issue will not occur I will test this and let you know On 15 April 2015 at 19:48, Imran Rashid iras...@cloudera.com wrote: oh interesting. The suggested workaround is to wrap the result from collectAsMap into another hashmap, you should try that: MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); MapString, String tmp = new HashMapString, String(matchData); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(tmp); Can you please clarify: * Does it work w/ java serialization in the end? Or is this kryo only? * which Spark version you are using? (one of the relevant bugs was fixed in 1.2.1 and 1.3.0) On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele gangele...@gmail.com wrote: This looks like known issue? check this out http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html Can you please suggest any work around I am broad casting HashMap return from RDD.collectasMap(). On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote: this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com wrote: Yes Without Kryo it did work out.when I remove kryo registration it did worked out On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote: its not working with the combination of Broadcast. Without Kyro also not working. On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote: Is it working without kryo? Thanks Best Regards On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I am getting below exception while using Kyro serializable with broadcast variable. I am broadcating a hasmap with below line. MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap(); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage 4.0 (TID 7) java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103) at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:203) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 18 more 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/04/15 12:58:51 INFO
Re: How to get a clean DataFrame schema merge
Schema merging is not the feature you are looking for. It is designed when you are adding new records (that are not associated with old records), which may or may not have new or missing columns. In your case it looks like you have two datasets that you want to load separately and join on a key. On Wed, Apr 15, 2015 at 5:59 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, If you follow the example of schema merging in the spark documentation http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging you obtain the following results when you want to load the result data : single triple double 1 3 null 2 6 null 4 12 null 3 9 null 5 15 null 1 null 2 2 null 4 4 null 8 3 null 6 5 null 10 How to remove these null value and get something more logical like : single triple double 1 3 2 2 6 4 4 12 8 3 9 6 5 15 10 Bests, Jao
Re: Running beyond physical memory limits
The setting to increase is spark.yarn.executor.memoryOverhead On Wed, Apr 15, 2015 at 6:35 AM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hello Sean Owen, Thanks for your reply..Ill increase overhead memory and check it.. Bytheway ,Any difference between 1.1 and 1.2 makes, this issue..? Since It was passing spark 1.1 and throwing following error in 1.2...( this makes me doubt full) Thanks Regards Brahma Reddy Battula From: Sean Owen [so...@cloudera.com] Sent: Wednesday, April 15, 2015 2:49 PM To: Brahma Reddy Battula Cc: Akhil Das; user@spark.apache.org Subject: Re: Running beyond physical memory limits This is not related to executor memory, but the extra overhead subtracted from the executor's size in order to avoid using more than the physical memory that YARN allows. That is, if you declare a 32G executor YARN lets you use 32G physical memory but your JVM heap must be significantly less than 32G max. This is the overhead factor that is subtracted for you, and it seems to need to be bigger in your case. On Wed, Apr 15, 2015 at 10:16 AM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Thanks lot for your reply.. There is no issue with spark1.1..Following issue came when I upgrade to spark2.0...Hence I did not decrease spark.executor.memory... I mean to say, used same config for spark1.1 and spark1.2.. Is there any issue with spark1.2..? Or Yarn will lead this..? And why executor will not release memory, if there are tasks running..? Thanks Regards Brahma Reddy Battula From: Akhil Das [ak...@sigmoidanalytics.com] Sent: Wednesday, April 15, 2015 2:35 PM To: Brahma Reddy Battula Cc: user@spark.apache.org Subject: Re: Running beyond physical memory limits Did you try reducing your spark.executor.memory? Thanks Best Regards On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hello Sparkers I am newbie to spark and need help.. We are using spark 1.2, we are getting the following error and executor is getting killed..I seen SPARK-1930 and it should be in 1.2.. Any pointer to following error, like what might lead this error.. 2015-04-15 11:55:39,697 | WARN | Container Monitor | Container [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory used; 26.7 GB of 260 GB virtual memory used. Killing container. Dump of the process-tree for container_1429065217137_0012_01_-411041790 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp -Dspark.driver.port=23204 -Dspark.random.port.max=23999 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 -Dspark.ui.port=23000 -Dspark.random.port.min=23000 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 126843 (bash) 0 0 11603968 331 /bin/bash -c /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' '-Dspark.random.port.min=23000' -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend
Re: multinomial and Bernoulli model in NaiveBayes
CC Leah, who added Bernoulli option to MLlib's NaiveBayes. -Xiangrui On Wed, Apr 15, 2015 at 4:49 AM, 姜林和 linhe_ji...@163.com wrote: Dear meng: Thanks for the great work for park machine learning, and I saw the changes for NaiveBayes algorithm , separate the algorithm to : multinomial model and Bernoulli model ,but there be something confused me: the caculating of P(Ci) -- pi(i) P(j|Ci) -- theta(i,j) on multinomial and Bernoulli model are all different ,I can only see theta(i,j) is calculate on different way,but not pi(i) Bernoulli: the origin feature vector i of label must be 0 or 1, 1 represent word j is exits in Document i, pi(i) = (number of Documents of class C(i) + lamda)/(number of Documents of all class + 2*lamda ) theta(i)(j) = (number of Documents which j exists in class C(i) + lamda)/(number of Documents of class C(i) + 2*lamda ) Multinomial: pi(i) = (number of words of class C(i) + lamda)/(number of words of all classes + numFeatures*lamda ) theta(i)(j) = (number of words j in class C(i) + lamda)/(number of words in class C(i) + numFeatures*lamda ) the conparison of two algorithm : definition in Multinomial Multinomial definition in Bernoulli Bernoulli pi(i) number of words of class C(i) math.log(numAllWordsOfC + lambda) -piLogDenom number of Documents of class C(i) math.log(n + lambda) - piLogDenom piLogDenom number of words of all classes math.log(numAllWords + numfeatures* lambda) number of Documents of all class math.log(numDocuments + 2 * lambda) theta(i)(j) number of words j in class C(i) math.log(sumTermFreqs(j) + lamda) - thetaLogDenom number of Documents which j exists in class C(i) theta(i)(j) = math.log(sumTermFreqs(j) + lamda) - thetaLogDenom thetaLogDenom number of words in class C(i) math.log(numAllWordsOfC + numfeatures*lambda) number of Documents of class C(i) math.log(n + 2 * lamda) best regard ! Linhe Jiang Linhe Jiang
Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2
Env - Spark 1.3 Hadoop 2.3, Kerbeos xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 in same environment val codec = classOf[some codec class] val a = sc.textFile(/some_hdfs_file) a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace in Spark 1.3, works in Spark 1.2 in same env 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) some codec calls at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: adding new elements to batch RDD from DStream RDD
What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-elements-to-batch-RDD-from-DStream-RDD-tp22504.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SparkSQL; Thriftserver] Help tracking missing 5 minutes
Hi Spark users, Trying to upgrade to Spark1.2 and running into the following seeing some very slow queries and wondering if someone can point me in the right direction for debugging. My Spark UI shows a job with duration 15s (see attached screenshot). Which would be great but client side measurement shows the query takes just over 4 min 15/04/15 16:34:59 INFO ParseDriver: Parsing command: ***my query here*** 15/04/15 16:38:28 INFO ParquetTypesConverter: Falling back to schema conversion from Parquet types; 15/04/15 16:38:29 INFO DAGScheduler: Got job 6 (collect at SparkPlan.scala:84) with 401 output partitions (allowLocal=false) So there is a gap of almost 4 min between the parse and the next line I can identify as relating to this job. Can someone shed some light on what happens between the parse and the DAG scheduler? The spark UI also shows the submitted time as 16:38 which leads me to believe it counts time from when the scheduler gets the job...but what happens before then? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
adding new elements to batch RDD from DStream RDD
The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-elements-to-batch-RDD-from-DStream-RDD-tp22504.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
exception during foreach run
Hi All I am getting below exception while running foreach after zipwithindex ,flatMapvalue,flatmapvalues, Insideview foreach I m doing lookup in broadcast variable java.util.concurrent.RejectedExecutionException: Worker has already been shutdown at org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) at org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) at org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) at org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) at org.jboss.netty.channel.Channels.disconnect(Channels.java:781) at org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211) at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223) at akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222) at scala.util.Success.foreach(Try.scala:205) at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) :
Re: RAM management during cogroup and join
Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RAM management during cogroup and join
That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 9:25 PM To: Evo Eftimov Cc: user Subject: Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RAM management during cogroup and join
Agreed. On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote: That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, April 15, 2015 9:25 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RAM management during cogroup and join
Thank you Sir, and one final confirmation/clarification - are all forms of joins in the Spark API for DStream RDDs based on cogroup in terms of their internal implementation From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 9:48 PM To: Evo Eftimov Cc: user Subject: Re: RAM management during cogroup and join Agreed. On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote: That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 9:25 PM To: Evo Eftimov Cc: user Subject: Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: adding new elements to batch RDD from DStream RDD
I keep seeing only common statements Re DStream RDDs and Batch RDDs - There is certainly something to keep me from using them together and it is the OO API differences I have described previously, several times ... Re the batch RDD reloading from file and that there is no need for threads - the driver of spark streaming app instantiates and submits a DAG pipeline to the spark streaming cluster and keeps it alive while it is running - this is not exactly a liner execution where the main thread of the driver can invoke the spark context method for loading batch RDDs from file for e.g. a second time moreover after specific period of time -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:14 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch.
Re: adding new elements to batch RDD from DStream RDD
Yep, you are looking at operations on DStream, which is not what I'm talking about. You should look at DStream.foreachRDD (or Java equivalent), which hands you an RDD. Makes more sense? The rest may make more sense when you try it. There is actually a lot less complexity than you think. On Wed, Apr 15, 2015 at 8:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The OO API in question was mentioned several times - as the transform method of DStreamRDD which is the ONLY way to join/cogroup/union DSTreamRDD with batch RDD aka JavaRDD Here is paste from the spark javadoc K2,V2 JavaPairDStreamK2,V2 transformToPair(FunctionR,JavaPairRDDK2,V2 transformFunc) Return a new DStream in which each RDD is generated by applying a function on each RDD of 'this' DStream. As you can see it ALWAYS returns a DStream NOT a JavaRDD aka batch RDD Re the rest of the discussion (re-loading batch RDD from file within spark steraming context) - lets leave that since we are not getting anywhere -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:30 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What API differences are you talking about? a DStream gives a sequence of RDDs. I'm not referring to DStream or its API. Spark in general can execute many pipelines at once, ones that even refer to the same RDD. What I mean you seem to be looking for a way to change one shared RDD, but in fact, you simply create an RDD on top of the current state of the data whenever and wherever you wish. Unless you're caching the RDD's blocks, you don't have much need to share a reference to one RDD anyway, which is what I thought you were getting at. On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote: I keep seeing only common statements Re DStream RDDs and Batch RDDs - There is certainly something to keep me from using them together and it is the OO API differences I have described previously, several times ... Re the batch RDD reloading from file and that there is no need for threads - the driver of spark streaming app instantiates and submits a DAG pipeline to the spark streaming cluster and keeps it alive while it is running - this is not exactly a liner execution where the main thread of the driver can invoke the spark context method for loading batch RDDs from file for e.g. a second time moreover after specific period of time -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:14 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from
RAM management during cogroup and join
There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: adding new elements to batch RDD from DStream RDD
Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-element s-to-batch-RDD-from-DStream-RDD-tp22504.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: adding new elements to batch RDD from DStream RDD
Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch. Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD. And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/adding-new-element s-to-batch-RDD-from-DStream-RDD-tp22504.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: adding new elements to batch RDD from DStream RDD
What API differences are you talking about? a DStream gives a sequence of RDDs. I'm not referring to DStream or its API. Spark in general can execute many pipelines at once, ones that even refer to the same RDD. What I mean you seem to be looking for a way to change one shared RDD, but in fact, you simply create an RDD on top of the current state of the data whenever and wherever you wish. Unless you're caching the RDD's blocks, you don't have much need to share a reference to one RDD anyway, which is what I thought you were getting at. On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote: I keep seeing only common statements Re DStream RDDs and Batch RDDs - There is certainly something to keep me from using them together and it is the OO API differences I have described previously, several times ... Re the batch RDD reloading from file and that there is no need for threads - the driver of spark streaming app instantiates and submits a DAG pipeline to the spark streaming cluster and keeps it alive while it is running - this is not exactly a liner execution where the main thread of the driver can invoke the spark context method for loading batch RDDs from file for e.g. a second time moreover after specific period of time -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:14 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What do you mean by batch RDD? they're just RDDs, though store their data in different ways and come from different sources. You can union an RDD from an HDFS file with one from a DStream. It sounds like you want streaming data to live longer than its batch interval, but that's not something you can expect the streaming framework to provide. It's perfectly possible to save the RDD's data to persistent store and use it later. You can't update RDDs; they're immutable. You can re-read data from persistent store by making a new RDD at any time. On Wed, Apr 15, 2015 at 7:37 PM, Evo Eftimov evo.efti...@isecc.com wrote: The only way to join / union /cogroup a DStream RDD with Batch RDD is via the transform method, which returns another DStream
RE: adding new elements to batch RDD from DStream RDD
The OO API in question was mentioned several times - as the transform method of DStreamRDD which is the ONLY way to join/cogroup/union DSTreamRDD with batch RDD aka JavaRDD Here is paste from the spark javadoc K2,V2 JavaPairDStreamK2,V2 transformToPair(FunctionR,JavaPairRDDK2,V2 transformFunc) Return a new DStream in which each RDD is generated by applying a function on each RDD of 'this' DStream. As you can see it ALWAYS returns a DStream NOT a JavaRDD aka batch RDD Re the rest of the discussion (re-loading batch RDD from file within spark steraming context) - lets leave that since we are not getting anywhere -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:30 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD What API differences are you talking about? a DStream gives a sequence of RDDs. I'm not referring to DStream or its API. Spark in general can execute many pipelines at once, ones that even refer to the same RDD. What I mean you seem to be looking for a way to change one shared RDD, but in fact, you simply create an RDD on top of the current state of the data whenever and wherever you wish. Unless you're caching the RDD's blocks, you don't have much need to share a reference to one RDD anyway, which is what I thought you were getting at. On Wed, Apr 15, 2015 at 8:25 PM, Evo Eftimov evo.efti...@isecc.com wrote: I keep seeing only common statements Re DStream RDDs and Batch RDDs - There is certainly something to keep me from using them together and it is the OO API differences I have described previously, several times ... Re the batch RDD reloading from file and that there is no need for threads - the driver of spark streaming app instantiates and submits a DAG pipeline to the spark streaming cluster and keeps it alive while it is running - this is not exactly a liner execution where the main thread of the driver can invoke the spark context method for loading batch RDDs from file for e.g. a second time moreover after specific period of time -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 8:14 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: adding new elements to batch RDD from DStream RDD Yes, I mean there's nothing to keep you from using them together other than their very different lifetime. That's probably the key here: if you need the streaming data to live a long time it has to live in persistent storage first. I do exactly this and what you describe for the same purpose. I don't believe there's any need for threads; an RDD is just bookkeeping about partitions, and that has to be re-assessed when the underlying data grows. But making a new RDD on the fly is easy. It's a reference to the data only. (Well, that changes if you cache the results, in which case you very much care about unpersisting the RDD before getting a different reference to all of the same data and more.) On Wed, Apr 15, 2015 at 8:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: Hi Sean well there is certainly a difference between batch RDD and streaming RDD and in the previous reply you have already outlined some. Other differences are in the Object Oriented Model / API of Spark, which also matters besides the RDD / Spark Cluster Platform architecture. Secondly, in the previous em I have clearly described what I mean by update and that it is a result of RDD transformation and hence a new RDD derived from the previously joined/union/cogrouped one - ie not mutating an existing RDD Lets also leave aside the architectural goal why I want to keep updating a batch RDD with new data coming from DStream RDDs - fyi it is NOT to make streaming RDDs long living Let me now go back to the overall objective - the app context is Spark Streaming job. I want to update / add the content of incoming streaming RDDs (e.g. JavaDStreamRDDs) to an already loaded (e.g. from HDFS file) batch RDD e.g. JavaRDD - the only way to union / join / cogroup from DSTreamRDD to batch RDD is via the transform method which always returns DStream RDD NOT batch RDD - check the API On a separate note - your suggestion to keep reloading a Batch RDD from a file - it may have some applications in other scenarios so lets drill down into it - in the context of Spark Streaming app where the driver launches a DAG pipeline and then just essentially hangs, I guess the only way to keep reloading a batch RDD from file is from a separate thread still using the same spark context. The thread will reload the batch RDD with the same reference ie reassign the reference to the newly instantiated/loaded batch RDD - is that what you mean by reloading batch RDD from file -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, April 15, 2015 7:43 PM
aliasing aggregate columns?
Hi Guys - Having trouble figuring out the semantics for using the alias function on the final sum and count aggregations? cool_summary = reviews.select(reviews.user_id, cool_cnt(votes.cool).alias(cool_cnt)).groupBy(user_id).agg({cool_cnt:sum,*:count}) cool_summary DataFrame[user_id: string, SUM(cool_cnt#725): double, COUNT(1): bigint]
Re: spark job progress-style report on console ?
Just add the following line spark.ui.showConsoleProgress true do your conf/spark-defaults.conf file. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-job-progress-style-report-on-console-tp22440p22506.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RAM management during cogroup and join
Well, DStream joins are nothing but RDD joins at its core. However, there are more optimizations that you using DataFrames and Spark SQL joins. With the schema, there is a greater scope for optimizing the joins. So converting RDDs from streaming and the batch RDDs to data frames, and then applying joins may improve performance. TD On Wed, Apr 15, 2015 at 1:50 PM, Evo Eftimov evo.efti...@isecc.com wrote: Thank you Sir, and one final confirmation/clarification - are all forms of joins in the Spark API for DStream RDDs based on cogroup in terms of their internal implementation *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, April 15, 2015 9:48 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: RAM management during cogroup and join Agreed. On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote: That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, April 15, 2015 9:25 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RAM management during cogroup and join
Data Frames are available from the latest 1.3 release I believe – in 1.2 (our case at the moment) I guess the options are more limited PS: agree that DSTreams are just an abstraction for a sequence / streams of (ordinary) RDDs – when i use “DStreams” I mean the DStream OO API in Spark not that DStreams are some sort of different type of RDDs From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 11:11 PM To: Evo Eftimov Cc: user Subject: Re: RAM management during cogroup and join Well, DStream joins are nothing but RDD joins at its core. However, there are more optimizations that you using DataFrames and Spark SQL joins. With the schema, there is a greater scope for optimizing the joins. So converting RDDs from streaming and the batch RDDs to data frames, and then applying joins may improve performance. TD On Wed, Apr 15, 2015 at 1:50 PM, Evo Eftimov evo.efti...@isecc.com wrote: Thank you Sir, and one final confirmation/clarification - are all forms of joins in the Spark API for DStream RDDs based on cogroup in terms of their internal implementation From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 9:48 PM To: Evo Eftimov Cc: user Subject: Re: RAM management during cogroup and join Agreed. On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote: That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, April 15, 2015 9:25 PM To: Evo Eftimov Cc: user Subject: Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Can you clarify more on what you want to do after querying? Is the batch not completed until the querying and subsequent processing has completed? On Tue, Apr 14, 2015 at 10:36 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Thank you Tathagata, very helpful answer. Though, I would like to highlight that recent stream processing systems are trying to help users in implementing use case of holding such large (like 2 months of data) states. I would mention here Samza state management http://samza.apache.org/learn/documentation/0.9/container/state-management.html and Trident state management https://storm.apache.org/documentation/Trident-state. I'm waiting when Spark would help with that too, because generally I definitely prefer this technology:) But considering holding state in Cassandra with Spark Streaming, I understand we're not talking here about using Cassandra as input nor output (nor make use of spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector). We're talking here about querying Cassandra from map/mapPartition functions. I have one question about it: Is it possible to query Cassandra asynchronously within Spark Streaming? And while doing it, is it possible to take next batch of rows, while the previous is waiting on Cassandra I/O? I think (but I'm not sure) this generally asks, whether several consecutive windows can interleave (because they are long to process)? Let's draw it: --|query Cassandra asynchronously--- window1 --- window2 While writing it, I start to believe they can, because windows are time-triggered, not triggered when previous window has finished... But it's better to ask:) 2015-04-15 2:08 GMT+02:00 Tathagata Das t...@databricks.com: Fundamentally, stream processing systems are designed for processing streams of data, not for storing large volumes of data for a long period of time. So if you have to maintain that much state for months, then its best to use another system that is designed for long term storage (like Cassandra) which has proper support for making all that state fault-tolerant, high-performant, etc. So yes, the best option is to use Cassandra for the state and Spark Streaming jobs accessing the state from Cassandra. There are a number of optimizations that can be done. Its not too hard to build a simple on-demand populated cache (singleton hash map for example), that speeds up access from Cassandra, and all updates are written through the cache. This is a common use of Spark Streaming + Cassandra/HBase. Regarding the performance of updateStateByKey, we are aware of the limitations, and we will improve it soon :) TD On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even *2 months*. During that time there might be around *500 milion*of such events, and *millions of unique* users, which are keys here. *My questions are:* - Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? - Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: - using Cassandra for state - using Trident state (with Cassandra probably) - using Samza with its state management.
Passing Elastic Search Mappings in Spark Conf
Hi, Is there a way to pass the mapping to define a field as not analyzed with es-spark settings. I am just wondering if I can set the mapping type for a field as not analyzed using the set function in spark conf as similar to the other es settings. val sconf = new SparkConf() .setMaster(local[1]) .setAppName(Load Data To ES) .set(spark.ui.port, 4141) .set(es.index.auto.create, true) .set(es.net.http.auth.user, es_admin) .set(es.index.auto.create, true) .set(es.mapping.names, CREATED_DATE:@timestamp) Thanks, Deepak Subhramanian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to do dispatching in Streaming?
It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Dataset announcement
Dear Spark users, I would like to draw your attention to a dataset that we recently released, which is as of now the largest machine learning dataset ever released; see the following blog announcements: - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/ - http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx The characteristics of this dataset are: - 1 TB of data - binary classification - 13 integer features - 26 categorical features, some of them taking millions of values. - 4B rows Hopefully this dataset will be useful to assess and push further the scalability of Spark and MLlib. Cheers, Olivier -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Passing Elastic Search Mappings in Spark Conf
If you want to specify mapping you must first create the mappings for your index types before indexing. As far as I know there is no way to specify this via ES-hadoop. But it's best practice to explicitly create mappings prior to indexing, or to use index templates when dynamically creating indexes. — Sent from Mailbox On Thu, Apr 16, 2015 at 1:14 AM, Deepak Subhramanian deepak.subhraman...@gmail.com wrote: Hi, Is there a way to pass the mapping to define a field as not analyzed with es-spark settings. I am just wondering if I can set the mapping type for a field as not analyzed using the set function in spark conf as similar to the other es settings. val sconf = new SparkConf() .setMaster(local[1]) .setAppName(Load Data To ES) .set(spark.ui.port, 4141) .set(es.index.auto.create, true) .set(es.net.http.auth.user, es_admin) .set(es.index.auto.create, true) .set(es.mapping.names, CREATED_DATE:@timestamp) Thanks, Deepak Subhramanian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataset announcement
Greetings! How about medical data sets, and specifically longitudinal vital signs. Can people send good pointers? Thanks in advance, -- ttfn Simon Edelhaus California 2015 On Wed, Apr 15, 2015 at 6:01 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Very neat, Olivier; thanks for sharing this. Matei On Apr 15, 2015, at 5:58 PM, Olivier Chapelle oliv...@chapelle.cc wrote: Dear Spark users, I would like to draw your attention to a dataset that we recently released, which is as of now the largest machine learning dataset ever released; see the following blog announcements: - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/ - http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx The characteristics of this dataset are: - 1 TB of data - binary classification - 13 integer features - 26 categorical features, some of them taking millions of values. - 4B rows Hopefully this dataset will be useful to assess and push further the scalability of Spark and MLlib. Cheers, Olivier -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Actor not found
13119 Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkdri...@dmslave13.et2.tbsi te.net:5908/), Path(/user/OutputCommitCoordinator)] 13120 at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) 13121 at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) 13122 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 13123 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) 13124 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) 13125 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) 13126 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) 13127 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 13128 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) 13129 at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) 13130 at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) 13131 at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) 13132 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 13133 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 13134 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) 13135 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) 13136 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) 13137 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) 13138 at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) 13139 at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) 13140 at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 13141 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) 13142 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 13143 at akka.actor.ActorCell.invoke(ActorCell.scala:487) 13144 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 13145 at akka.dispatch.Mailbox.run(Mailbox.scala:220) 13146 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 13147 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 13148 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 13149 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 13150 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I met the same problem when I run spark on yarn. Is this a bug or what ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-tp22265p22508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataset announcement
Very neat, Olivier; thanks for sharing this. Matei On Apr 15, 2015, at 5:58 PM, Olivier Chapelle oliv...@chapelle.cc wrote: Dear Spark users, I would like to draw your attention to a dataset that we recently released, which is as of now the largest machine learning dataset ever released; see the following blog announcements: - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/ - http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx The characteristics of this dataset are: - 1 TB of data - binary classification - 13 integer features - 26 categorical features, some of them taking millions of values. - 4B rows Hopefully this dataset will be useful to assess and push further the scalability of Spark and MLlib. Cheers, Olivier -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
The problem lies with getting the driver classes into the primordial class loader when running on YARN. Basically I need to somehow set the SPARK_CLASSPATH or compute_classpath.sh when running on YARN. I’m not sure how to do this when YARN is handling all the file copy. From: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 11:49 pm To: Wang, Daoyuan daoyuan.w...@intel.commailto:daoyuan.w...@intel.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Tried with 1.3.0 release (built myself) the most recent 1.3.1 Snapshot off the 1.3 branch. Haven't tried with 1.4/master. From: Wang, Daoyuan [daoyuan.w...@intel.commailto:daoyuan.w...@intel.com] Sent: Wednesday, April 15, 2015 5:22 PM To: Nathan McCarthy; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Can you provide your spark version? Thanks, Daoyuan From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au] Sent: Wednesday, April 15, 2015 1:57 PM To: Nathan McCarthy; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Just an update, tried with the old JdbcRDD and that worked fine. From: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 1:57 pm To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Hi guys, Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply “driver” - “net.sourceforge.jtds.jdbc.Driver” to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I’ve tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I’m probably missing some class path setting here. In jdbc.DefaultSource.createRelation it looks like the call to Class.forName doesn’t specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; import scala.collection.JavaConverters._ sc.parallelize(Seq(1,2,3,4)).flatMap(_ = java.sql.DriverManager.getDrivers().asScala.map(d = s”$d |
Re: Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?
now we have spark 1.3.0 on chd 5.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760p22509.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataset announcement
Thanks Olivier. Good work. Interesting in more than one ways - including training, benchmarking, testing new releases et al. One quick question - do you plan to make it available as an S3 bucket ? Cheers k/ On Wed, Apr 15, 2015 at 5:58 PM, Olivier Chapelle oliv...@chapelle.cc wrote: Dear Spark users, I would like to draw your attention to a dataset that we recently released, which is as of now the largest machine learning dataset ever released; see the following blog announcements: - http://labs.criteo.com/2015/03/criteo-releases-its-new-dataset/ - http://blogs.technet.com/b/machinelearning/archive/2015/04/01/now-available-on-azure-ml-criteo-39-s-1tb-click-prediction-dataset.aspx The characteristics of this dataset are: - 1 TB of data - binary classification - 13 integer features - 26 categorical features, some of them taking millions of values. - 4B rows Hopefully this dataset will be useful to assess and push further the scalability of Spark and MLlib. Cheers, Olivier -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataset-announcement-tp22507.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
Can you provide the JDBC connector jar version. Possibly the full JAR name and full command you ran Spark with ? On Wed, Apr 15, 2015 at 11:27 AM, Nathan McCarthy nathan.mccar...@quantium.com.au wrote: Just an update, tried with the old JdbcRDD and that worked fine. From: Nathan nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 1:57 pm To: user@spark.apache.org user@spark.apache.org Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Hi guys, Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply *“driver” - “net.sourceforge.jtds.jdbc.Driver”* to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I’ve tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I’m probably missing some class path setting here. In *jdbc.DefaultSource.createRelation* it looks like the call to *Class.forName* doesn’t specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; *import *scala.collection.JavaConverters._ sc.parallelize(*Seq*(1,2,3,4)).flatMap(_ = java.sql.DriverManager. *getDrivers*().asScala.map(d = *s**”**$*d* | **$*{d.acceptsURL( *jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd*)}** )).collect().foreach(*println*) This successfully returns; 15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at Main.scala:46, took 1.495597 s org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true As a final test we tried with postgres driver and had the same problem. Any ideas? Cheers, Nathan -- Deepak
Parquet Partition Size are different when using Dataframe's save append funciton
Hi, When I use Dataframe’s save append function, I find that the parquet partition size are very different. Part-r-1 to 00021 are generated at the first time save append function is called. Part-r-00022 to 00042 is generated at the second time save append function is called. As you can see, the size of Part-r-1 to 00021 is 200M, while the size of Part-r-00022 to 00042 is 700M. But the source data is the same, which confused me. -rw-r--r-- 1 sysplatform sysplatform 2.0K Apr 8 10:01 _common_metadata -rw-r--r-- 1 sysplatform sysplatform 392K Apr 8 10:01 _metadata -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-1.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:44 part-r-2.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-3.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-4.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-5.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-6.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-7.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-8.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-9.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00010.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00011.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00012.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00013.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00014.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00015.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00016.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00017.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00018.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00019.parquet -rw-r--r-- 1 sysplatform sysplatform 199M Apr 8 09:43 part-r-00020.parquet -rw-r--r-- 1 sysplatform sysplatform 200M Apr 8 09:43 part-r-00021.parquet -rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:01 part-r-00022.parquet -rw-r--r-- 1 sysplatform sysplatform 723M Apr 8 10:00 part-r-00023.parquet -rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00024.parquet -rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:00 part-r-00025.parquet -rw-r--r-- 1 sysplatform sysplatform 717M Apr 8 10:00 part-r-00026.parquet -rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:00 part-r-00027.parquet -rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:01 part-r-00028.parquet -rw-r--r-- 1 sysplatform sysplatform 725M Apr 8 10:01 part-r-00029.parquet -rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:00 part-r-00030.parquet -rw-r--r-- 1 sysplatform sysplatform 725M Apr 8 10:01 part-r-00031.parquet -rw-r--r-- 1 sysplatform sysplatform 724M Apr 8 10:01 part-r-00032.parquet -rw-r--r-- 1 sysplatform sysplatform 724M Apr 8 10:00 part-r-00033.parquet -rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00034.parquet -rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00035.parquet -rw-r--r-- 1 sysplatform sysplatform 720M Apr 8 10:00 part-r-00036.parquet -rw-r--r-- 1 sysplatform sysplatform 717M Apr 8 10:00 part-r-00037.parquet -rw-r--r-- 1 sysplatform sysplatform 724M Apr 8 10:01 part-r-00038.parquet -rw-r--r-- 1 sysplatform sysplatform 722M Apr 8 10:01 part-r-00039.parquet -rw-r--r-- 1 sysplatform sysplatform 722M Apr 8 10:00 part-r-00040.parquet -rw-r--r-- 1 sysplatform sysplatform 721M Apr 8 10:01 part-r-00041.parquet -rw-r--r-- 1 sysplatform sysplatform 723M Apr 8 10:01 part-r-00042.parquet -rw-r--r-- 1 sysplatform sysplatform 0 Apr 8 10:01 _SUCCESS
Re: Saving RDDs as custom output format
You can try using ORCOutputFormat with yourRDD.saveAsNewAPIHadoopFile Thanks Best Regards On Tue, Apr 14, 2015 at 9:29 PM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, Is it possible to store RDDs as custom output formats, For example ORC? Thanks, Daniel
Re: spark streaming printing no output
Yes only Time: 142905487 ms strings gets printed on console. No output is getting printed. And timeinterval between two strings of form ( time:ms)is very less than Streaming Duration set in program. On Wed, Apr 15, 2015 at 5:11 AM, Shixiong Zhu zsxw...@gmail.com wrote: Could you see something like this in the console? --- Time: 142905487 ms --- Best Regards, Shixiong(Ryan) Zhu 2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: spark streaming printing no output
Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
RE: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
Can you provide your spark version? Thanks, Daoyuan From: Nathan McCarthy [mailto:nathan.mccar...@quantium.com.au] Sent: Wednesday, April 15, 2015 1:57 PM To: Nathan McCarthy; user@spark.apache.org Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Just an update, tried with the old JdbcRDD and that worked fine. From: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 1:57 pm To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Hi guys, Trying to use a Spark SQL context's .load(jdbc, ...) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I'm using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply driver - net.sourceforge.jtds.jdbc.Driver to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I've tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0mailto:net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I'm probably missing some class path setting here. In jdbc.DefaultSource.createRelation it looks like the call to Class.forName doesn't specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; import scala.collection.JavaConverters._ sc.parallelize(Seq(1,2,3,4)).flatMap(_ = java.sql.DriverManager.getDrivers().asScala.map(d = s$d | ${d.acceptsURL(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd)})).collect().foreach(println) This successfully returns; 15/04/15 01:07:37 INFO scheduler.DAGScheduler: Job 0 finished: collect at Main.scala:46, took 1.495597 s org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true org.apache.derby.jdbc.AutoloadedDriver40 | false com.mysql.jdbc.Driver | false net.sourceforge.jtds.jdbc.Driver | true As a final test we tried with postgres driver and had the same problem. Any ideas? Cheers, Nathan
Re: OOM in SizeEstimator while using combineByKey
what is your JVM heap size settings? The OOM in SIzeEstimator is caused by a lot of entry in IdentifyHashMap. A quick guess is that the object in your dataset is a custom class and you didn't implement the hashCode and equals method correctly. On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote: I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue and will report back if I have any findings. Meanwhile, if you guys have any idea of what could possibly result in this error or how to better debug this, please let me know. java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming printing no output
So the time niterval is much less than 1000 ms as you set in the code? That's weird. Could you check the whole outputs to confirm that the content won't be flushed by logs? Best Regards, Shixiong(Ryan) Zhu 2015-04-15 15:04 GMT+08:00 Shushant Arora shushantaror...@gmail.com: Yes only Time: 142905487 ms strings gets printed on console. No output is getting printed. And timeinterval between two strings of form ( time:ms)is very less than Streaming Duration set in program. On Wed, Apr 15, 2015 at 5:11 AM, Shixiong Zhu zsxw...@gmail.com wrote: Could you see something like this in the console? --- Time: 142905487 ms --- Best Regards, Shixiong(Ryan) Zhu 2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: Running Spark on Gateway - Connecting to Resource Manager Retries
Make sure your yarn service is running on 8032. Thanks Best Regards On Tue, Apr 14, 2015 at 12:35 PM, Vineet Mishra clearmido...@gmail.com wrote: Hi Team, I am running Spark Word Count example( https://github.com/sryza/simplesparkapp), if I go with master as local it works fine. But when I change the master to yarn its end with retries connecting to resource manager(stack trace mentioned below), 15/04/14 11:31:57 INFO RMProxy: Connecting to ResourceManager at / 0.0.0.0:8032 15/04/14 11:31:58 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/14 11:31:59 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) If I run the same command from Namenode instance it ends with ArrayOutofBoundException(Stack trace mentioned below), 15/04/14 11:38:44 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 1 at com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:28) at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Looking forward to get it resolve to work on respective nodes. Thanks,
OOM in SizeEstimator while using combineByKey
I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue and will report back if I have any findings. Meanwhile, if you guys have any idea of what could possibly result in this error or how to better debug this, please let me know. java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
Do multiple ipython notebooks work on yarn in one cluster?
My colleagues and I work on spark recently. We just setup a new cluster on yarn over which we can run spark. We basically use ipython and write program in the notebook in a specific port(like ) via http. We have our own notebooks and the odd thing is that if I run my notebook first, my colleagues' notebooks will get stuck and stop there when there is an action; and if one of them runs first, mine will get stuck. Does one run of pyspark only support one notebook at one time? Or we should figure out some configurations to make it work? @Nam has the same problem with me. http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-accesses-to-a-Spark-cluster-via-iPython-Notebook-td12162.html http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-accesses-to-a-Spark-cluster-via-iPython-Notebook-td12162.html Thanks Ai -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Do-multiple-ipython-notebooks-work-on-yarn-in-one-cluster-tp22498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark on Gateway - Connecting to Resource Manager Retries
Hi Akhil, Its running fine when running through Namenode(RM) but fails while running through Gateway, if I add hadoop-core jars to the hadoop directory(/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hadoop/) it works fine. Its really strange that I am running the job through Spark-Submit and running via NameNode works fine and it fails when running through gateway even when both are having same classpath. Anyone tries running Spark from Gateway? Looking for the quick revert! Thanks, On Wed, Apr 15, 2015 at 12:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Make sure your yarn service is running on 8032. Thanks Best Regards On Tue, Apr 14, 2015 at 12:35 PM, Vineet Mishra clearmido...@gmail.com wrote: Hi Team, I am running Spark Word Count example( https://github.com/sryza/simplesparkapp), if I go with master as local it works fine. But when I change the master to yarn its end with retries connecting to resource manager(stack trace mentioned below), 15/04/14 11:31:57 INFO RMProxy: Connecting to ResourceManager at / 0.0.0.0:8032 15/04/14 11:31:58 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) 15/04/14 11:31:59 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS) If I run the same command from Namenode instance it ends with ArrayOutofBoundException(Stack trace mentioned below), 15/04/14 11:38:44 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 1 at com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:28) at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Looking forward to get it resolve to work on respective nodes. Thanks,
Re: spark streaming printing no output
When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: Re: spark streaming printing no output
Looks the message is consumed by the another console?( can see messages typed on this port from another console.) bit1...@163.com From: Shushant Arora Date: 2015-04-15 17:11 To: Akhil Das CC: user@spark.apache.org Subject: Re: spark streaming printing no output When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: Re: spark streaming printing no output
Its printing on console but on HDFS all folders are still empty . On Wed, Apr 15, 2015 at 2:29 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks !! Yes message types on this console is seen on another console. When I closed another console, spark streaming job is printing messages on console . Isn't the message written on a port using netcat be avaible for multiple consumers? On Wed, Apr 15, 2015 at 2:22 PM, bit1...@163.com bit1...@163.com wrote: Looks the message is consumed by the another console?( can see messages typed on this port from another console.) -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-04-15 17:11 *To:* Akhil Das ak...@sigmoidanalytics.com *CC:* user@spark.apache.org *Subject:* Re: spark streaming printing no output When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
RE: Running beyond physical memory limits
Thanks lot for your reply.. There is no issue with spark1.1..Following issue came when I upgrade to spark2.0...Hence I did not decrease spark.executor.memory... I mean to say, used same config for spark1.1 and spark1.2.. Is there any issue with spark1.2..? Or Yarn will lead this..? And why executor will not release memory, if there are tasks running..? Thanks Regards Brahma Reddy Battula From: Akhil Das [ak...@sigmoidanalytics.com] Sent: Wednesday, April 15, 2015 2:35 PM To: Brahma Reddy Battula Cc: user@spark.apache.org Subject: Re: Running beyond physical memory limits Did you try reducing your spark.executor.memory? Thanks Best Regards On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.commailto:brahmareddy.batt...@huawei.com wrote: Hello Sparkers I am newbie to spark and need help.. We are using spark 1.2, we are getting the following error and executor is getting killed..I seen SPARK-1930 and it should be in 1.2.. Any pointer to following error, like what might lead this error.. 2015-04-15 11:55:39,697 | WARN | Container Monitor | Container [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory used; 26.7 GB of 260 GB virtual memory used. Killing container. Dump of the process-tree for container_1429065217137_0012_01_-411041790 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp -Dspark.driver.port=23204 -Dspark.random.port.max=23999 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 -Dspark.ui.port=23000 -Dspark.random.port.min=23000 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedSchedulerhttp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 126843 (bash) 0 0 11603968 331 /bin/bash -c /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' '-Dspark.random.port.min=23000' -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedSchedulerhttp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 1 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout 2 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr | org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447) And some doubts 1) why executor will not release memory, if there are tasks running..? 2) is there issue from hadoop which will lead this error..? Any help , will be appreciated... Thanks Regards Brahma Reddy Battula
Re: Running beyond physical memory limits
This is not related to executor memory, but the extra overhead subtracted from the executor's size in order to avoid using more than the physical memory that YARN allows. That is, if you declare a 32G executor YARN lets you use 32G physical memory but your JVM heap must be significantly less than 32G max. This is the overhead factor that is subtracted for you, and it seems to need to be bigger in your case. On Wed, Apr 15, 2015 at 10:16 AM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Thanks lot for your reply.. There is no issue with spark1.1..Following issue came when I upgrade to spark2.0...Hence I did not decrease spark.executor.memory... I mean to say, used same config for spark1.1 and spark1.2.. Is there any issue with spark1.2..? Or Yarn will lead this..? And why executor will not release memory, if there are tasks running..? Thanks Regards Brahma Reddy Battula From: Akhil Das [ak...@sigmoidanalytics.com] Sent: Wednesday, April 15, 2015 2:35 PM To: Brahma Reddy Battula Cc: user@spark.apache.org Subject: Re: Running beyond physical memory limits Did you try reducing your spark.executor.memory? Thanks Best Regards On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hello Sparkers I am newbie to spark and need help.. We are using spark 1.2, we are getting the following error and executor is getting killed..I seen SPARK-1930 and it should be in 1.2.. Any pointer to following error, like what might lead this error.. 2015-04-15 11:55:39,697 | WARN | Container Monitor | Container [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory used; 26.7 GB of 260 GB virtual memory used. Killing container. Dump of the process-tree for container_1429065217137_0012_01_-411041790 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp -Dspark.driver.port=23204 -Dspark.random.port.max=23999 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 -Dspark.ui.port=23000 -Dspark.random.port.min=23000 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 126843 (bash) 0 0 11603968 331 /bin/bash -c /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' '-Dspark.random.port.min=23000' -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 1 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout 2 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr | org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447) And some doubts 1) why executor will not release memory, if there are tasks running..? 2) is there issue from hadoop which will lead this error..? Any help , will be appreciated... Thanks Regards Brahma Reddy Battula - To
Running beyond physical memory limits
Hello Sparkers I am newbie to spark and need help.. We are using spark 1.2, we are getting the following error and executor is getting killed..I seen SPARK-1930 and it should be in 1.2.. Any pointer to following error, like what might lead this error.. 2015-04-15 11:55:39,697 | WARN | Container Monitor | Container [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory used; 26.7 GB of 260 GB virtual memory used. Killing container. Dump of the process-tree for container_1429065217137_0012_01_-411041790 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp -Dspark.driver.port=23204 -Dspark.random.port.max=23999 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 -Dspark.ui.port=23000 -Dspark.random.port.min=23000 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 126843 (bash) 0 0 11603968 331 /bin/bash -c /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' '-Dspark.random.port.min=23000' -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 1 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout 2 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr | org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447) And some doubts 1) why executor will not release memory, if there are tasks running..? 2) is there issue from hadoop which will lead this error..? Any help , will be appreciated... Thanks Regards Brahma Reddy Battula
Re: Re: spark streaming printing no output
Thanks !! Yes message types on this console is seen on another console. When I closed another console, spark streaming job is printing messages on console . Isn't the message written on a port using netcat be avaible for multiple consumers? On Wed, Apr 15, 2015 at 2:22 PM, bit1...@163.com bit1...@163.com wrote: Looks the message is consumed by the another console?( can see messages typed on this port from another console.) -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-04-15 17:11 *To:* Akhil Das ak...@sigmoidanalytics.com *CC:* user@spark.apache.org *Subject:* Re: spark streaming printing no output When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong.
Re: Running beyond physical memory limits
Did you try reducing your spark.executor.memory? Thanks Best Regards On Wed, Apr 15, 2015 at 2:29 PM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hello Sparkers I am newbie to spark and need help.. We are using spark 1.2, we are getting the following error and executor is getting killed..I seen SPARK-1930 and it should be in 1.2.. *Any pointer to following error, like what might lead this error.*. 2015-04-15 11:55:39,697 | WARN | Container Monitor | Container [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory used; 26.7 GB of 260 GB virtual memory used. Killing container. Dump of the process-tree for container_1429065217137_0012_01_-411041790 :|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp -Dspark.driver.port=23204 -Dspark.random.port.max=23999 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 -Dspark.ui.port=23000 -Dspark.random.port.min=23000 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp:// sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 126843 (bash) 0 0 11603968 331 /bin/bash -c /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' '-Dspark.random.port.min=23000' -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp:// sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 1 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout 2 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr | org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447) And some doubts 1) why executor will not release memory, if there are tasks running..? 2) is there issue from hadoop which will lead this error..? Any help , will be appreciated... Thanks Regards Brahma Reddy Battula
Re: Running beyond physical memory limits
All this means is that your JVM is using more memory than it requested from YARN. You need to increase the YARN memory overhead setting, perhaps. On Wed, Apr 15, 2015 at 9:59 AM, Brahma Reddy Battula brahmareddy.batt...@huawei.com wrote: Hello Sparkers I am newbie to spark and need help.. We are using spark 1.2, we are getting the following error and executor is getting killed..I seen SPARK-1930 and it should be in 1.2.. Any pointer to following error, like what might lead this error.. 2015-04-15 11:55:39,697 | WARN | Container Monitor | Container [pid=126843,containerID=container_1429065217137_0012_01_-411041790] is running beyond physical memory limits. Current usage: 26.0 GB of 26 GB physical memory used; 26.7 GB of 260 GB virtual memory used. Killing container. Dump of the process-tree for container_1429065217137_0012_01_-411041790 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 126872 126843 126843 126843 (java) 2049457 22816 28673892352 6824864 /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp -Dspark.driver.port=23204 -Dspark.random.port.max=23999 -Dspark.akka.threads=32 -Dspark.akka.frameSize=10 -Dspark.akka.timeout=100 -Dspark.ui.port=23000 -Dspark.random.port.min=23000 -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 |- 126843 76960 126843 126843 (bash) 0 0 11603968 331 /bin/bash -c /opt/huawei/Bigdata/jdk1.7.0_76//bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms24576m -Xmx24576m -Dlog4j.configuration=file:/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Spark/spark/conf/log4j-executor.properties -Djava.library.path=/opt/huawei/Bigdata/DataSight_FM_BasePlatform_V100R001C00_Hadoop//hadoop/lib/native -Djava.io.tmpdir=/srv/BigData/hadoop/data4/nm/localdir/usercache/ossuser/appcache/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/tmp '-Dspark.driver.port=23204' '-Dspark.random.port.max=23999' '-Dspark.akka.threads=32' '-Dspark.akka.frameSize=10' '-Dspark.akka.timeout=100' '-Dspark.ui.port=23000' '-Dspark.random.port.min=23000' -Dspark.yarn.app.container.log.dir=/srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@172.57.1.61:23204/user/CoarseGrainedScheduler 3 hadoopc1h11 10 application_1429065217137_0012 1 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stdout 2 /srv/BigData/hadoop/data5/nm/containerlogs/application_1429065217137_0012/container_1429065217137_0012_01_-411041790/stderr | org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl$MonitoringThread.run(ContainersMonitorImpl.java:447) And some doubts 1) why executor will not release memory, if there are tasks running..? 2) is there issue from hadoop which will lead this error..? Any help , will be appreciated... Thanks Regards Brahma Reddy Battula - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming with kafka
Once you start your streaming application to read from Kafka, it will launch receivers on the executor nodes. And you can see them on the streaming tab of your driver ui (runs on 4040). [image: Inline image 1] These receivers will be fixed till the end of your pipeline (unless its crashed etc.) You can say, eah receiver will run on a single core. Thanks Best Regards On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I want to understand the flow of spark streaming with kafka. In spark Streaming is the executor nodes at each run of streaming interval same or At each stream interval cluster manager assigns new executor nodes for processing this batch input. If yes then at each batch interval new executors register themselves as kafka consumers? Even without kafka is executor nodes on each batch interval same or driver nodes gets new executor nodes from cluster manager ? Thanks Shushant
Re: Opening many Parquet files = slow
Hi guys Regarding to parquet files. I have Spark 1.2.0 and reading 27 parquet files (250MB/file), it lasts 4 minutes. I have a cluster with 4 nodes and it seems me too slow. The load function is not available in Spark 1.2, so I can't test it Regards. Miguel. On Mon, Apr 13, 2015 at 8:12 PM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys Does anyone know how to stop Spark from opening all Parquet files before starting a job? This is quite a show stopper for me, since I have 5000 Parquet files on S3. Recap of what I tried: 1. Disable schema merging with: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “s3://path/to/folder)) This opens most files in the folder (17 out of 21 in my small example). For 5000 files on S3, sqlContext.load() takes 30 minutes to complete. 2. Use the old api with: sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”) Now sqlContext.parquetFile() only opens a few files and prints the schema: so far so good! However, as soon as I run e.g. a count() on the dataframe, Spark still opens all files _before_ starting a job/stage. Effectively this moves the delay from load() to count() (or any other action I presume). 3. Run Spark 1.3.1-rc2. sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the same as 1.3.0. Any help would be greatly appreciated! Thanks a lot. Eric On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi Ted Ah, I guess the term ‘source’ confused me :) Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to a single day of logs)) for 1 directory with 21 files, Spark opens 17 files: 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading at position '261573524' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening ' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading at position '259256807' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading at position '260002042' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading at position ‘260875275' etc. I can’t seem to pass a comma-separated list of directories to load(), so in order to load multiple days of logs, I have to point to the root folder and depend on auto-partition discovery (unless there’s a smarter way). Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to root log dir)) starts opening what seems like all files (I killed the process after a couple of minutes). Thanks for helping out. Eric -- Saludos. Miguel Ángel
Re: spark streaming with kafka
So receivers will be fixed for every run of streaming interval job. Say I have set stream Duration to be 10 minutes, then after each 10 minute job will be created and same executor nodes say in your case(spark-akhil-slave2.c.neat-axis-616.internal and spark-akhil-slave1.c.neat-axis-616.internal) will be used ? Is it with all streaming applications that executor nodes will be fixed and won't change depending on load of current batch or is it for kafka case only?? On Wed, Apr 15, 2015 at 4:12 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Once you start your streaming application to read from Kafka, it will launch receivers on the executor nodes. And you can see them on the streaming tab of your driver ui (runs on 4040). [image: Inline image 1] These receivers will be fixed till the end of your pipeline (unless its crashed etc.) You can say, eah receiver will run on a single core. Thanks Best Regards On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I want to understand the flow of spark streaming with kafka. In spark Streaming is the executor nodes at each run of streaming interval same or At each stream interval cluster manager assigns new executor nodes for processing this batch input. If yes then at each batch interval new executors register themselves as kafka consumers? Even without kafka is executor nodes on each batch interval same or driver nodes gets new executor nodes from cluster manager ? Thanks Shushant
Re: OOM in SizeEstimator while using combineByKey
I am setting spark.executor.memory as 1024m on a 3 node cluster with each node having 4 cores and 7 GB RAM. The combiner functions are taking scala case classes as input and are generating mutable.ListBuffer of scala case classes. Therefore, I am guessing hashCode and equals should be taken care of. Thanks, Aniket On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE advance...@gmail.com wrote: what is your JVM heap size settings? The OOM in SIzeEstimator is caused by a lot of entry in IdentifyHashMap. A quick guess is that the object in your dataset is a custom class and you didn't implement the hashCode and equals method correctly. On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote: I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue and will report back if I have any findings. Meanwhile, if you guys have any idea of what could possibly result in this error or how to better debug this, please let me know. java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
spark streaming with kafka
Hi I want to understand the flow of spark streaming with kafka. In spark Streaming is the executor nodes at each run of streaming interval same or At each stream interval cluster manager assigns new executor nodes for processing this batch input. If yes then at each batch interval new executors register themselves as kafka consumers? Even without kafka is executor nodes on each batch interval same or driver nodes gets new executor nodes from cluster manager ? Thanks Shushant
Re: Re: spark streaming with kafka
@Shushant: In my case, the receivers will be fixed till the end of the application. This one's for Kafka case only, if you have a filestream application, you will not have any receivers. Also, for kafka, next time you run the application, it's not fixed that the receivers will get launched on the same machines. @bit1129, If the receiver is crashed, it will try to restart the receiver, since i have only 2 machines on that cluster, it will either restart on the same node or on the other node. Thanks Best Regards On Wed, Apr 15, 2015 at 4:30 PM, bit1...@163.com bit1...@163.com wrote: Hi, Akhil, I would ask a question here: Assume Receiver-0 is crashed, will it be restarted on other worker nodes(In your picture, there would be 2 receivers on the same node) or will it start on the same node? -- bit1...@163.com *From:* Akhil Das ak...@sigmoidanalytics.com *Date:* 2015-04-15 19:12 *To:* Shushant Arora shushantaror...@gmail.com *CC:* user user@spark.apache.org *Subject:* Re: spark streaming with kafka Once you start your streaming application to read from Kafka, it will launch receivers on the executor nodes. And you can see them on the streaming tab of your driver ui (runs on 4040). [image: Inline image 1] These receivers will be fixed till the end of your pipeline (unless its crashed etc.) You can say, eah receiver will run on a single core. Thanks Best Regards On Wed, Apr 15, 2015 at 3:46 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I want to understand the flow of spark streaming with kafka. In spark Streaming is the executor nodes at each run of streaming interval same or At each stream interval cluster manager assigns new executor nodes for processing this batch input. If yes then at each batch interval new executors register themselves as kafka consumers? Even without kafka is executor nodes on each batch interval same or driver nodes gets new executor nodes from cluster manager ? Thanks Shushant [image: 提示图标] 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 *1* 个附件 image.png(25K) 极速下载 http://preview.mail.163.com/xdownload?filename=image.pngmid=1tbiShXcgVO-s0wyewAAsYpart=3sign=de80f80932aeee7d4eea5a9e5f4b755etime=1429095438uid=bit1129%40163.com 在线预览 http://preview.mail.163.com/preview?mid=1tbiShXcgVO-s0wyewAAsYpart=3sign=de80f80932aeee7d4eea5a9e5f4b755etime=1429095438uid=bit1129%40163.com