Add to the spark users list
Hi, Organization name: Amrita Center for Cyber Security Systems and Networks URL: https://www.amrita.edu/center/cyber-security We use Spark for BigData analytics and ML/Data Mining. Spark Streaming in IoT Platform -- Regards, Bilna P
Re: Guava 11 dependency issue in Spark 1.2.0
-dev Guava was not downgraded to 11. That PR was not merged. It was part of a discussion about, indeed, what to do about potential Guava version conflicts. Spark uses Guava, but so does Hadoop, and so do user programs. Spark uses 14.0.1 in fact: https://github.com/apache/spark/blob/master/pom.xml#L330 This is a symptom of conflict between Spark's Guava 14 and Hadoop's Guava 11. See for example https://issues.apache.org/jira/browse/HIVE-7387 as well. Guava is now shaded in Spark as of 1.2.0 (and 1.1.x?), so I would think a lot of these problems are solved. As we've seen though, this one is tricky. What's your Spark version? and what are you executing? what mode -- standalone, YARN? What Hadoop version? On Tue, Jan 6, 2015 at 8:38 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, I have been running a simple Spark app on a local spark cluster and I came across this error. Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695) at com.databricks.spark.avro.AvroRelation.buildScan$lzycompute(AvroRelation.scala:45) at com.databricks.spark.avro.AvroRelation.buildScan(AvroRelation.scala:44) at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444) at org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114) While looking into this I found out that Guava was downgraded to version 11 in this PR. https://github.com/apache/spark/pull/1610 In this PR OpenHashSet.scala:261 line hashInt has been changed to hashLong. But when I actually run my app, java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt error occurs, which is understandable because hashInt is not available before Guava 12. So, I''m wondering why this occurs? Cheers -- Niranda Perera
Re: Guava 11 dependency issue in Spark 1.2.0
Hi Sean, My mistake, Guava 11 dependency came from the hadoop-commons indeed. I'm running the following simple app in spark 1.2.0 standalone local cluster (2 workers) with Hadoop 1.2.1 public class AvroSparkTest { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf() .setMaster(spark://niranda-ThinkPad-T540p:7077) //(local[2]) .setAppName(avro-spark-test); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaSQLContext sqlContext = new JavaSQLContext(sparkContext); JavaSchemaRDD episodes = AvroUtils.avroFile(sqlContext, /home/niranda/projects/avro-spark-test/src/test/resources/episodes.avro); episodes.printSchema(); episodes.registerTempTable(avroTable); ListRow result = sqlContext.sql(SELECT * FROM avroTable).collect(); for (Row row : result) { System.out.println(row.toString()); } } } As you pointed out, this error occurs while adding the hadoop dependency. this runs without a problem when the hadoop dependency is removed and the master is set to local[]. Cheers On Tue, Jan 6, 2015 at 3:23 PM, Sean Owen so...@cloudera.com wrote: -dev Guava was not downgraded to 11. That PR was not merged. It was part of a discussion about, indeed, what to do about potential Guava version conflicts. Spark uses Guava, but so does Hadoop, and so do user programs. Spark uses 14.0.1 in fact: https://github.com/apache/spark/blob/master/pom.xml#L330 This is a symptom of conflict between Spark's Guava 14 and Hadoop's Guava 11. See for example https://issues.apache.org/jira/browse/HIVE-7387 as well. Guava is now shaded in Spark as of 1.2.0 (and 1.1.x?), so I would think a lot of these problems are solved. As we've seen though, this one is tricky. What's your Spark version? and what are you executing? what mode -- standalone, YARN? What Hadoop version? On Tue, Jan 6, 2015 at 8:38 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, I have been running a simple Spark app on a local spark cluster and I came across this error. Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695) at com.databricks.spark.avro.AvroRelation.buildScan$lzycompute(AvroRelation.scala:45) at com.databricks.spark.avro.AvroRelation.buildScan(AvroRelation.scala:44) at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at
Re: Guava 11 dependency issue in Spark 1.2.0
Oh, are you actually bundling Hadoop in your app? that may be the problem. If you're using stand-alone mode, why include Hadoop? In any event, Spark and Hadoop are intended to be 'provided' dependencies in the app you send to spark-submit. On Tue, Jan 6, 2015 at 10:15 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, My mistake, Guava 11 dependency came from the hadoop-commons indeed. I'm running the following simple app in spark 1.2.0 standalone local cluster (2 workers) with Hadoop 1.2.1 public class AvroSparkTest { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf() .setMaster(spark://niranda-ThinkPad-T540p:7077) //(local[2]) .setAppName(avro-spark-test); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaSQLContext sqlContext = new JavaSQLContext(sparkContext); JavaSchemaRDD episodes = AvroUtils.avroFile(sqlContext, /home/niranda/projects/avro-spark-test/src/test/resources/episodes.avro); episodes.printSchema(); episodes.registerTempTable(avroTable); ListRow result = sqlContext.sql(SELECT * FROM avroTable).collect(); for (Row row : result) { System.out.println(row.toString()); } } } As you pointed out, this error occurs while adding the hadoop dependency. this runs without a problem when the hadoop dependency is removed and the master is set to local[]. Cheers On Tue, Jan 6, 2015 at 3:23 PM, Sean Owen so...@cloudera.com wrote: -dev Guava was not downgraded to 11. That PR was not merged. It was part of a discussion about, indeed, what to do about potential Guava version conflicts. Spark uses Guava, but so does Hadoop, and so do user programs. Spark uses 14.0.1 in fact: https://github.com/apache/spark/blob/master/pom.xml#L330 This is a symptom of conflict between Spark's Guava 14 and Hadoop's Guava 11. See for example https://issues.apache.org/jira/browse/HIVE-7387 as well. Guava is now shaded in Spark as of 1.2.0 (and 1.1.x?), so I would think a lot of these problems are solved. As we've seen though, this one is tricky. What's your Spark version? and what are you executing? what mode -- standalone, YARN? What Hadoop version? On Tue, Jan 6, 2015 at 8:38 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, I have been running a simple Spark app on a local spark cluster and I came across this error. Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695) at
Guava 11 dependency issue in Spark 1.2.0
Hi, I have been running a simple Spark app on a local spark cluster and I came across this error. Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695) at com.databricks.spark.avro.AvroRelation.buildScan$lzycompute(AvroRelation.scala:45) at com.databricks.spark.avro.AvroRelation.buildScan(AvroRelation.scala:44) at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444) at org.apache.spark.sql.api.java.JavaSchemaRDD.collect(JavaSchemaRDD.scala:114) While looking into this I found out that Guava was downgraded to version 11 in this PR. https://github.com/apache/spark/pull/1610 In this PR OpenHashSet.scala:261 line hashInt has been changed to hashLong. But when I actually run my app, java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt error occurs, which is understandable because hashInt is not available before Guava 12. So, I''m wondering why this occurs? Cheers -- Niranda Perera
Re: Shuffle Problems in 1.2.0
I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s Traceback (most recent call last): File /home/hadoop/test_scripts/test.py, line 63, in module result = j.collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 676, in collect bytesInJava = self._jrdd.collect().iterator() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Stopped : An error occurred while calling o117.collect. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at
Re: Timeout Exception in standalone cluster
That's an issue with your firewall (more likely hostnames issue in /etc/hosts), You may find the following posts helpful - http://stackoverflow.com/questions/27039954/intermittent-timeout-exception-using-spark - http://koobehub.wordpress.com/2014/09/29/spark-the-standalone-cluster-deployment/ Thanks Best Regards On Tue, Jan 6, 2015 at 12:13 AM, rajnish rajnish.gar...@gmail.com wrote: Hi, I am getting following exception in Spark (1.1.0) Job that is running on Standalone Cluster. My cluster configuration is: Intel(R) 2.50GHz 4 Core 16 GB RAM 5 Machines. Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:113) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:156) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Timeout-Exception-in-standalone-cluster-tp20979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saving partial (top 10) DStream windows to hdfs
You can try something like: *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))* Thanks Best Regards On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I am counting values in each window and find the top values and want to save only the top 10 frequent values of each window to hdfs rather than all the values. *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)* *val counts = eegStreams(a).map(x = (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))* *val sortedCounts = counts.map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap)* *//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n + rdd.take(10).mkString(\n)))* *sortedCounts.map(tuple = %s,%s.format(tuple._1, tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))* I can print top 10 as above in red. I have also tried *sortedCounts.foreachRDD{ rdd = ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ + (a+1))} * but I get the following error. *15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext* *java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext* Regards, Laeeq
Re: Spark error in execution
Hello! I just had a very similar stack trace. It was caused by an Akka version mismatch. (From trying to use Play 2.3 with Spark 1.1 by accident instead of 1.2.) On Mon, Nov 24, 2014 at 7:15 PM, Blackeye black...@iit.demokritos.gr wrote: I created an application in spark. When I run it with spark, everything works fine. But when I export my application with the libraries (via sbt), and trying to run it as an executable jar, I get the following error: 14/11/24 20:06:11 ERROR OneForOneStrategy: exception during creation akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:349) at akka.actor.Props.newActor(Props.scala:249) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.AbstractMethodError: akka.remote.RemoteActorRefProvider$RemotingTerminator.akka$actor$FSM$_setter_$Event_$eq(Lakka/actor/FSM$Event$;)V at akka.actor.FSM$class.$init$(FSM.scala:272) at akka.remote.RemoteActorRefProvider$RemotingTerminator.init(RemoteActorRefProvider.scala:36) ... 18 more 14/11/24 20:06:11 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.AbstractMethodError at akka.actor.ActorCell.create(ActorCell.scala:580) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] [11/24/2014 20:06:11.478] [sparkDriver-akka.actor.default-dispatcher-4] [ActorSystem(sparkDriver)] Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver] java.lang.AbstractMethodError at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] [11/24/2014 20:06:11.481] [sparkDriver-akka.actor.default-dispatcher-3] [ActorSystem(sparkDriver)] Uncaught fatal error from thread
Re: NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream
Good luck. Let me know If I can assist you further Regards -Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20991.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set EXTRA_JAR environment variable for spark-jobserver
I suggest to create uber jar instead. check my thread for the same http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-td20926.html Regards -Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set EXTRA_JAR environment variable for spark-jobserver
Or you can use: sc.addJar(/path/to/your/datastax.jar) Thanks Best Regards On Tue, Jan 6, 2015 at 5:53 PM, bchazalet bchaza...@companywatch.net wrote: I don't know much about spark-jobserver, but you can set jars programatically using the method setJars on SparkConf. Looking at your code it seems that you're importing classes from com.datastax.spark.connector._ to load data from cassandra, so you may need to add that datastax jar to your SparkConf: val conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) .setAppName(jobserver test demo) .setMaster(local[4]) .setJars(Seq(path/to/your/datastax/jar)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set EXTRA_JAR environment variable for spark-jobserver
I don't know much about spark-jobserver, but you can set jars programatically using the method setJars on SparkConf. Looking at your code it seems that you're importing classes from com.datastax.spark.connector._ to load data from cassandra, so you may need to add that datastax jar to your SparkConf: val conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) .setAppName(jobserver test demo) .setMaster(local[4]) .setJars(Seq(path/to/your/datastax/jar)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
streamSQL - is it available or is it in POC ?
Hi, Just wondering whether this is released yet and if so on which version of Spark ? Many Thanks, Thomas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streamSQL-is-it-available-or-is-it-in-POC-tp20993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream
Thanks Pankaj for the assembly plugin tip. Yes there is a version mismatch of akka actor between Spark 1.1.1 and akka-http/akka-stream (2.2.3 versus 2.3.x). After some digging, I see 4 options for this problem (in case others encounter it): 1) Upgrade to Spark 1.2.0, the same code will work (not possible for me since I want to use Datastax connector) 2) Make a custom build of Spark 1.1.1 3) Use play instead with akka actor 2.2.3 (play 2.2.3 for instance) 4) Wait for Datastax connector 1.2.0 (released on 31th January 2015) I currently trying option 3 Thank you all for your help On Sat, Jan 3, 2015 at 4:11 AM, Pankaj Narang [via Apache Spark User List] ml-node+s1001560n20950...@n3.nabble.com wrote: Like before I get a java.lang.NoClassDefFoundError: akka/stream/FlowMaterializer$ This can be solved using assembly plugin. you need to enable assembly plugin in global plugins C:\Users\infoshore\.sbt\0.13\plugins add a line in plugins.sbt addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.0) and then add the following lines in build.sbt import AssemblyKeys._ // put this at the top of the file seq(assemblySettings: _*) Also in the bottom dont forget to add assemblySettings mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf= MergeStrategy.concat case _ = MergeStrategy.first } Now in your sbt run sbt assembly that will create the jar which can be run without --jars options as this will be a uber jar containing all jars Also nosuchmethod exception is thrown when there is difference in versions of complied and runtime versions. What is the version of spark you are using ? You need to use same version in build.sbt Here is your build.sbt libraryDependencies += org.apache.spark %% spark-core % 1.1.1 //exclude(com.typesafe, config) libraryDependencies += org.apache.spark %% spark-sql % 1.1.1 libraryDependencies += com.datastax.cassandra % cassandra-driver-core % 2.1.3 libraryDependencies += com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() withJavadoc() libraryDependencies += org.apache.cassandra % cassandra-thrift % 2.0.5 libraryDependencies += joda-time % joda-time % 2.6 and your error is Exception in thread main java.lang.NoSuchMethodError: com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J at akka.stream.StreamSubscriptionTimeoutSettings$.apply(FlowMaterializer.scala:256) I think there is version mismatch on the jars you use at runtime If you need more help add me on skype pankaj.narang ---Pankaj -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20950.html To unsubscribe from NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=20926code=Y2hyaXN0b3BoZS5iaWxsaWFyZEBnbWFpbC5jb218MjA5MjZ8LTE2ODA2NTAwMDk= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20988.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Set EXTRA_JAR environment variable for spark-jobserver
Boris, Thank you for your suggestion. I used following code and still facing the same issue - val conf = new SparkConf(true).set(spark.cassandra.connection.host, 127.0.0.1) .setAppName(jobserver test demo) .setMaster(local[4]) .setJars(Seq(C:/spark-jobserver/lib/spark-cassandra-connector_2.10-1.1.0-alpha3.jar)) Am I missing something? Meanwhile, I will try for Pankaj's reply of using uber jar. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Trouble with large Yarn job
Hey, I have a job that keeps failing if too much data is processed, and I can't see how to get it working. I've tried repartitioning with more partitions and increasing amount of memory for the executors (now about 12G and 400 executors. Here is a snippets of the first part of the code, which succeeds without any problems: val all_days = sc.union( ds.dateInterval(startDate, date).map(date = sc.avroFile[LrDailyEndSong](daily_end_song_path + date) .map(s = ( (s.getUsername, s.getTrackUri), UserItemData(s.getUsername, s.getTrackUri, build_vector1(date, s), build_vector2(s ) ) .reduceByKey(sum_vectors) I want to process 30 days of data or more, but am only able to process about 10 days. If having more days of data (lower startDate in code above), the union above succeeds but the code below fails with Error communicating with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed error messages). Here is a snippet of the code that fails: val top_tracks = all_days.map(t = (t._1._2.toString, 1)).reduceByKey (_+_) .filter(trackFilter) .repartition(4) .persist(StorageLevel.MEMORY_AND_DISK_SER) val observation_data = all_days .mapPartitions(_.map(o = (o._1._2.toString, o._2))) .join(top_tracks) The calculation of top_tracks works, but the last mapPartitions task fails with given error message if given more than 10 days of data. Also tried increasing the spark.akka.askTimeout setting, but it still fails even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the kryo serialization. Realize that this is a rather long message, but I'm stuck and would appreciate any help or clues for resolving this issue. Seems to be a out-of-memory issue, but it does not seems to help to increase the number of partitions. Thanks, Anders
Re: Finding most occurrences in a JSON Nested Array
Many thanks Pankaj, I've got it working. For completeness, here's the whole segment (including the printout at diff stages): -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20996.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
I think I am almost lost in the internals of Spark
I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. Is there detailed guide into the intervals about what spark does when it execute the word count?Thanks. There is a lot of good explanation about the process how Hadoop does the word count.
How to limit the number of concurrent tasks per node?
Hi Pro, One map() operation in my Spark APP takes an RDD[A] as input and map each element in RDD[A] using a custom mapping function func(x:A):B to another object of type B. I received lots of OutOfMemory error, and after some debugging I find this is because func() requires significant amount of memory when computing each input x. And since each node is executing multiple mapping operation (i.e., multiple func()) concurrently. The total amount of memory required by those mapping operation per node exceeds the amount of physical memory. What I have tried so far: In order to solve the problem, I limited the number of concurrent mapping tasks to 2 per executor(node), by coalesce() the RDD[A] first and then repartition() it: val rdd:RDD[A] = sc.textFile().flapMap() rdd.coalesce(#_of_nodes * 2).map(func).repartition(300) I was also suggested to set spark.task.cpus larger than 1. But this could take effect globally. My pipeline involves lots of other operations which I do not want to set limit on. Is there any better solution to fulfil the purpose? Thanks! Pengcheng
Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn't save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) org.apache.hadoop.hbase.client.HTable.put(HTable.java:915) com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) KafkaMessageHandler thread is in WAITING state Thread 70: KafkaMessageHandler-0 (WAITING) sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) kafka.consumer.ConsumerIterator.makeNext(Unknown Source) kafka.consumer.ConsumerIterator.makeNext(Unknown Source) kafka.utils.IteratorTemplate.maybeComputeNext(Unknown Source) kafka.utils.IteratorTemplate.hasNext(Unknown Source) org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:132) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) java.util.concurrent.FutureTask.run(FutureTask.java:262) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
Problem getting Spark running on a Yarn cluster
Hello, We have hadoop 2.6.0 and Yarn set up on ec2. Trying to get spark 1.1.1 running on the Yarn cluster. I have of course googled around and found that this problem is solved for most after removing the line including 127.0.1.1 from /etc/hosts. This hasn’t seemed to solve this for me. Anyone has an idea where else might 127.0.1.1 be hiding in some conf? Looked everywhere… or is there a completely different problem? Thanks, Sharon I am getting this error: WARN network.SendingConnection: Error finishing connection to /127.0.1.1:47020 java.net.ConnectException: Connection refused
Pyspark Interactive shell
Hi, Anybody tried to connect to spark cluster( on UNIX machines) from windows interactive shell ? -Naveen.
Re: Finding most occurrences in a JSON Nested Array
Thats great. I was not having access on the developer machine so sent you the psuedo code only. Happy to see its working. If you need any more help related to spark let me know anytime. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set EXTRA_JAR environment variable for spark-jobserver
It does not look like you're supposed to fiddle with the SparkConf and even SparkContext in a 'job' (again, I don't know much about jobserver), as you're given a SparkContext as parameter in the build method. I guess jobserver initialises the SparkConf and SparkContext itself when it first starts, meanwhile you're actually creating a new one within your job, which the github example you mentionned doesn't do, it just uses the context given as parameter: def build(sc: SparkContext): RDD[(Reputation, User)] = { sc.textFile(inputPath). map(User.fromRow). collect { case Some(user) = user.reputation - user }. sortByKey(ascending = false) } I am not sure either how you upload your job's jar to the server (the curl command you posted does not seem to do so). Maybe you could try first to make it work on its own as a regular spark app, without using jobserver. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20998.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Location of logs in local mode
I¹m submitting a script using spark-submit in local mode for testing, and I¹m having trouble figuring out where the logs are stored. The documentation indicates that they should be in the work folder in the directory in which Spark lives on my system, but I see no such folder there. I¹ve set the SPARK_LOCAL_DIRS and SPARK_LOG_DIR environment variables in spark-env.sh, but there doesn¹t seem to be any log output generated in the locations I¹ve specified there either. I¹m just using spark-submit with master local, I haven¹t run any of the standalone cluster scripts, so I¹m not sure if there¹s something I¹m missing here as far as a default output location for logging. Thanks, Brett smime.p7s Description: S/MIME cryptographic signature
Re: Set EXTRA_JAR environment variable for spark-jobserver
*@Sasi* You should be able to create a job something like this: package io.radtech.spark.jobserver import java.util.UUID import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.rdd.RDD import org.joda.time.DateTime import com.datastax.spark.connector.types.TypeConverter import com.datastax.spark.connector.types.TypeConversionException import com.typesafe.config.Config case class AnalyticReport( deviceId: UUID, reportType: String, timestamp: DateTime, data: Map[String, String]) class ReadWriteCassandraJob { } trait AlycsReportSparkJob extends spark.jobserver.SparkJob with spark.jobserver.NamedRddSupport { val rddName = report // Validation is not really needed in this example def validate(sc: SparkContext, config: Config): spark.jobserver.SparkJobValidation = spark.jobserver.SparkJobValid } object ReadWriteCassandraJob extends AlycsReportSparkJob { val cassandraHost = 127.0.0.1 val keyspace = test val table = alycs_reports_by_device /* * Enable Cassandra-specific functions on the `SparkContext` and `RDD`: */ import com.datastax.spark.connector._ /* * Before creating the `SparkContext`, set the `spark.cassandra.connection.host` * property to the address of one of the Cassandra nodes. */ val conf = new SparkConf(true).set(spark.cassandra.connection.host, cassandraHost) /* * Set the port to connect to. If using embedded instance set to 9142 else * default to 9042. */ conf.set(spark.cassandra.connection.native.port, 9042) override def runJob(sc: SparkContext, config: Config) = { // Read table test.alycs_reports_by_device and print its contents: val rdd = sc.cassandraTable(keyspace, table).select( device_id, report_type, time, data) rdd.collect().foreach(println) val rddrows = rdd.map(r = AnalyticReport(UUID.fromString(r.getUUID(device_id).toString()), r.getString(report_type), new org.joda.time.DateTime(r.getDate(time)), r.getMap[String, String](data))) rddrows.collect.foreach(println) } } Then create a custom spark context file, src/main/resources/spark.context-settings.config, for the job; note the versions of the jars are incorrect below, don't have the latest ones off the top of my head. If you are using the uber / fat jar from spark-cassandra-connector then simple place that here instead, i believe the name is: spark-cassandra-connector-assembly-1.1.0.jar. spark.context-settings { spark.cores.max = 4 spark.cassandra.connection.host 127.0.0.1 dependent-jar-uris = [ local://sparkshell-lib/spark-cassandra-connector_2.10-1.0.0-rc4.jar, local://sparkshell-lib/cassandra-clientutil-2.0.9.jar, local://sparkshell-lib/cassandra-thrift-2.0.9.jar, local://sparkshell-lib/cassandra-driver-core-2.0.4.jar, local://sparkshell-lib/guava-15.0.jar, local://sparkshell-lib/libthrift-0.9.1.jar, local://sparkshell-lib/joda-convert-1.2.jar, local://sparkshell-lib/joda-time-2.3.jar ] } Now post the context to the job server: radtech:spark-jobserver-example$ curl -d src/main/resources/spark.context-settings.config -X POST 'localhost:8090/contexts/cassJob-context' Then execute your job: curl --data-binary @target/scala-2.10/spark-jobserver-example_2.10-0.1.0.jar localhost:8090/jars/cassjob curl -X POST 'localhost:8090/jobs?appName=cassjobclassPath=io.radtech.spark.jobserver.ReadWriteCassandraJobcontext=cassJob-context' Worse case you should be able to set these in your spark-defatul.conf to a location that is common to all your executors: spark.executor.extraClassPath=. HTH. -Todd On Tue, Jan 6, 2015 at 10:00 AM, bchazalet bchaza...@companywatch.net wrote: It does not look like you're supposed to fiddle with the SparkConf and even SparkContext in a 'job' (again, I don't know much about jobserver), as you're given a SparkContext as parameter in the build method. I guess jobserver initialises the SparkConf and SparkContext itself when it first starts, meanwhile you're actually creating a new one within your job, which the github example you mentionned doesn't do, it just uses the context given as parameter: def build(sc: SparkContext): RDD[(Reputation, User)] = { sc.textFile(inputPath). map(User.fromRow). collect { case Some(user) = user.reputation - user }. sortByKey(ascending = false) } I am not sure either how you upload your job's jar to the server (the curl command you posted does not seem to do so). Maybe you could try first to make it work on its own as a regular spark app, without using jobserver. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20998.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe,
Is there a way to read a parquet database without generating an RDD
I have an application where a function needs access to the results of a select from a parquet database. Creating a JavaSQLContext and from it a JavaSchemaRDD as shown below works but the parallelism is not needed - a simple JDBC call would work - Are there alternative non-parallel ways to achieve the same result JavaSQLContext sqlContext = application code JavaSchemaRDD parquetFile = sqlContext.parquetFile(MyDatabase); parquetFile.registerAsTable(peptides); JavaSchemaRDD binCounts = sqlContext.sql(SELECT * FROM + peptides + Where massBin = + mzAsInt); IteratorRow rowIterator = binCounts.toLocalIterator(); while (rowIterator.hasNext()) { Row rw = rowIterator.next(); ... application code }
Re: RDD Moving Average
So you want windows covering the same length of time, some of which will be fuller than others? You could, for example, simply bucket the data by minute to get this kind of effect. If you an RDD[Ticker], where Ticker has a timestamp in ms, you could: tickerRDD.groupBy(ticker = (ticker.timestamp / 6) * 6)) ... to get an RDD[(Long,Iterable[Ticker])], where the keys are the moment at the start of each minute, and the values are the Tickers within the following minute. You can try variations on this to bucket in different ways. Just be careful because a minute with a huge number of values might cause you to run out of memory. If you're just doing aggregations of some kind there are more efficient methods than this most generic method, like the aggregate methods. On Tue, Jan 6, 2015 at 8:34 PM, Asim Jalis asimja...@gmail.com wrote: Thanks. Another question. I have event data with timestamps. I want to create a sliding window using timestamps. Some windows will have a lot of events in them others won’t. Is there a way to get an RDD made of this kind of a variable length window? On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: First you'd need to sort the RDD to give it a meaningful order, but I assume you have some kind of timestamp in your data you can sort on. I think you might be after the sliding() function, a developer API in MLlib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43 On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis asimja...@gmail.com wrote: Is there an easy way to do a moving average across a single RDD (in a non-streaming app). Here is the use case. I have an RDD made up of stock prices. I want to calculate a moving average using a window size of N. Thanks. Asim
Re: Multiple Spark Streaming receiver model
Hi Manjul, Each StreamingContext will have its own batch size. If that doesn’t work for the different sources you have then you would have to create different streaming apps. You can only create a new StreamingContext in the same Spark app, once you’ve stopped the previous one. Spark certainly supports multiple streams within the same streaming app. In fact if the ingest rate of your stream is very high you can create multiple receivers and union the streams. Just remember that each receiver will require a core. The receivers will be distributed appropriately across your executors. Thanks, Silvio On 1/6/15, 7:11 PM, manjuldixit manjul.di...@kronos.com wrote: Hi, We have a requirement of receiving live input messages from RabbitMQ and process them into micro batches. For this we have selected SparkStreaming and we have written a connector for RabbitMQ receiver and Spark streaming, it is working fine. Now the main requirement is to receive different category of events from different channels/queues in a spark streaming context. Q1 : So how can I create different streaming context to receive message from different source (may or may not be in same frequency) within a Spark context? Q2: Is is advisable to use single StreamingContext to create different input streams from different sources? Q3: What all design consideration I need to take care in terms of specifying no of Cores for Spark master. Q4: Is there a was to distribute different Streaming receiving task to different workers ? You suggestions will be very valuable in my application design and if you can have some example for complex application which you can share may some URL reference or whatever ; it is very appreciated. Thanks Manjul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Spark-Streami ng-receiver-model-tp21002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle Problems in 1.2.0
I still can not reproduce it with 2 nodes (4 CPUs). Your repro.py could be faster (10 min) than before (22 min): inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): pc==3).collect() (also, no cache needed anymore) Davies On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote: The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com wrote: I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s
Multiple Spark Streaming receiver model
Hi, We have a requirement of receiving live input messages from RabbitMQ and process them into micro batches. For this we have selected SparkStreaming and we have written a connector for RabbitMQ receiver and Spark streaming, it is working fine. Now the main requirement is to receive different category of events from different channels/queues in a spark streaming context. Q1 : So how can I create different streaming context to receive message from different source (may or may not be in same frequency) within a Spark context? Q2: Is is advisable to use single StreamingContext to create different input streams from different sources? Q3: What all design consideration I need to take care in terms of specifying no of Cores for Spark master. Q4: Is there a was to distribute different Streaming receiving task to different workers ? You suggestions will be very valuable in my application design and if you can have some example for complex application which you can share may some URL reference or whatever ; it is very appreciated. Thanks Manjul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Spark-Streaming-receiver-model-tp21002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HDFS_DELEGATION_TOKEN errors after switching Spark Contexts
Hi all. In order to get Spark to properly release memory during batch processing as a workaround to issue https://issues.apache.org/jira/browse/SPARK-4927 I tear down and re-initialize the spark context with : context.stop() and context = new SparkContext() The problem I run into is that eventually I hit the below error: :15/01/06 13:52:34 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0 [1:53pm]:15/01/06 13:52:34 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 214318 for zjb238) can't be found in cache [1:53pm]:Exception in thread main org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 214318 for zjb238) can't be found in cache This terminates execution but I have no idea why this would be happening. Does anyone know what could be at play here? This error appears as soon as I try to hit HDFS after re-starting a Spark context. When this issue appears is not deterministic and I am able to run several successful iterations before I see it. Any help would be much appreciated. Thank you. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: MLLIB and Openblas library in non-default dir
thanks Xiangrui I'll try it. BTW: spark-submit is a standalone program (bin/spark-submit). Therefore, JVM has to be executed after spark-submit script Am I correct? On Mon, Jan 5, 2015 at 10:35 PM, Xiangrui Meng men...@gmail.com wrote: It might be hard to do that with spark-submit, because the executor JVMs may be already up and running before a user runs spark-submit. You can try to use `System.setProperty` to change the property at runtime, though it doesn't seem to be a good solution. -Xiangrui On Fri, Jan 2, 2015 at 6:28 AM, xhudik xhu...@gmail.com wrote: Hi I have compiled OpenBlas library into nonstandard directory and I want to inform Spark app about it via: -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so which is a standard option in netlib-java (https://github.com/fommil/netlib-java) I tried 2 ways: 1. via *--conf* parameter /bin/spark-submit -v --class org.apache.spark.examples.mllib.LinearRegression *--conf -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so* examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop1.0.4.jar data/mllib/sample_libsvm_data.txt/ 2. via *--driver-java-options* parameter /bin/spark-submit -v *--driver-java-options -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so* --class org.apache.spark.examples.mllib.LinearRegression examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop1.0.4.jar data/mllib/sample_libsvm_data.txt / How can I force spark-submit to propagate info about non-standard placement of openblas library to netlib-java lib? thanks, Tomas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-and-Openblas-library-in-non-default-dir-tp20943.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading from a centralized stored
No, most rdds partition input data appropriately. On Tue, Jan 6, 2015 at 1:41 PM, Franc Carter franc.car...@rozettatech.com wrote: One more question, to be clarify. Will every node pull in all the data ? thanks On Tue, Jan 6, 2015 at 12:56 PM, Cody Koeninger c...@koeninger.org wrote: If you are not co-locating spark executor processes on the same machines where the data is stored, and using an rdd that knows about which node to prefer scheduling a task on, yes, the data will be pulled over the network. Of the options you listed, S3 and DynamoDB cannot have spark running on the same machines. Cassandra can be run on the same nodes as spark, and recent versions of the spark cassandra connector implement preferred locations. You can run an rdbms on the same nodes as spark, but JdbcRDD doesn't implement preferred locations. On Mon, Jan 5, 2015 at 6:25 PM, Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm trying to understand how a Spark Cluster behaves when the data it is processing resides on a centralized/remote store (S3, Cassandra, DynamoDB, RDBMS etc). Does every node in the cluster retrieve all the data from the central store ? thanks -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
confidence/probability for prediction in MLlib
Hi, A while ago, somebody asked about getting a confidence value of a prediction with MLlib's implementation of Naive Bayes's classification. I was wondering if there is any plan in the near future for the predict function to return both a label and a confidence/probability? Or could the private variables in the various machine learning models be exposed so we could write our own functions which return both? Having a confidence/probability could be very useful in real application. For one thing, you can choose to trust the predicted label only if it has a high confidence level. Also, if you want to combine the results from multiple classifiers, the confidence/probability could be used as some kind of weight for combining. Thanks, Jianguo
Re: Reading from a centralized stored
Ah, so it's rdd specific - that would make sense. For those systems where it is possible to extract sensible susbets the rdds do so. My use case, which is probably biasing my thinking is DynamoDb which I don't think can efficiently extract records from M-to-N cheers On Wed, Jan 7, 2015 at 6:59 AM, Cody Koeninger c...@koeninger.org wrote: No, most rdds partition input data appropriately. On Tue, Jan 6, 2015 at 1:41 PM, Franc Carter franc.car...@rozettatech.com wrote: One more question, to be clarify. Will every node pull in all the data ? thanks On Tue, Jan 6, 2015 at 12:56 PM, Cody Koeninger c...@koeninger.org wrote: If you are not co-locating spark executor processes on the same machines where the data is stored, and using an rdd that knows about which node to prefer scheduling a task on, yes, the data will be pulled over the network. Of the options you listed, S3 and DynamoDB cannot have spark running on the same machines. Cassandra can be run on the same nodes as spark, and recent versions of the spark cassandra connector implement preferred locations. You can run an rdbms on the same nodes as spark, but JdbcRDD doesn't implement preferred locations. On Mon, Jan 5, 2015 at 6:25 PM, Franc Carter franc.car...@rozettatech.com wrote: Hi, I'm trying to understand how a Spark Cluster behaves when the data it is processing resides on a centralized/remote store (S3, Cassandra, DynamoDB, RDBMS etc). Does every node in the cluster retrieve all the data from the central store ? thanks -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: RDD Moving Average
Thanks. Another question. I have event data with timestamps. I want to create a sliding window using timestamps. Some windows will have a lot of events in them others won’t. Is there a way to get an RDD made of this kind of a variable length window? On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: First you'd need to sort the RDD to give it a meaningful order, but I assume you have some kind of timestamp in your data you can sort on. I think you might be after the sliding() function, a developer API in MLlib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43 On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis asimja...@gmail.com wrote: Is there an easy way to do a moving average across a single RDD (in a non-streaming app). Here is the use case. I have an RDD made up of stock prices. I want to calculate a moving average using a window size of N. Thanks. Asim
Re: RDD Moving Average
Except I want it to be a sliding window. So the same record could be in multiple buckets. On Tue, Jan 6, 2015 at 3:43 PM, Sean Owen so...@cloudera.com wrote: So you want windows covering the same length of time, some of which will be fuller than others? You could, for example, simply bucket the data by minute to get this kind of effect. If you an RDD[Ticker], where Ticker has a timestamp in ms, you could: tickerRDD.groupBy(ticker = (ticker.timestamp / 6) * 6)) ... to get an RDD[(Long,Iterable[Ticker])], where the keys are the moment at the start of each minute, and the values are the Tickers within the following minute. You can try variations on this to bucket in different ways. Just be careful because a minute with a huge number of values might cause you to run out of memory. If you're just doing aggregations of some kind there are more efficient methods than this most generic method, like the aggregate methods. On Tue, Jan 6, 2015 at 8:34 PM, Asim Jalis asimja...@gmail.com wrote: Thanks. Another question. I have event data with timestamps. I want to create a sliding window using timestamps. Some windows will have a lot of events in them others won’t. Is there a way to get an RDD made of this kind of a variable length window? On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: First you'd need to sort the RDD to give it a meaningful order, but I assume you have some kind of timestamp in your data you can sort on. I think you might be after the sliding() function, a developer API in MLlib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43 On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis asimja...@gmail.com wrote: Is there an easy way to do a moving average across a single RDD (in a non-streaming app). Here is the use case. I have an RDD made up of stock prices. I want to calculate a moving average using a window size of N. Thanks. Asim
Re: RDD Moving Average
One problem with this is that we are creating a lot of iterables containing a lot of repeated data. Is there a way to do this so that we can calculate a moving average incrementally? On Tue, Jan 6, 2015 at 4:44 PM, Sean Owen so...@cloudera.com wrote: Yes, if you break it down to... tickerRDD.map(ticker = (ticker.timestamp, ticker) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey ... as Michael alluded to, then it more naturally extends to the sliding window, since you can flatMap one Ticker to many (bucket, ticker) pairs, then group. I think this would implementing 1 minute buckets, sliding by 10 seconds: tickerRDD.flatMap(ticker = (ticker.timestamp - 6 to ticker.timestamp by 15000).map(ts = (ts, ticker)) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey On Tue, Jan 6, 2015 at 8:47 PM, Asim Jalis asimja...@gmail.com wrote: I guess I can use a similar groupBy approach. Map each event to all the windows that it can belong to. Then do a groupBy, etc. I was wondering if there was a more elegant approach. On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis asimja...@gmail.com wrote: Except I want it to be a sliding window. So the same record could be in multiple buckets.
Re: Using ec2 launch script with locally built version of spark?
Hi, As the ec2 launch script provided by spark uses https://github.com/mesos/spark-ec2 to download and configure all the tools in the cluster (spark, hadoop etc). You can create your own git repository to achieve your goal. More precisely: 1. Upload your own version of spark in s3 at address path to your spark 2. Fork https://github.com/mesos/spark-ec2 and make a change in ./spark/init.sh (add wget path to your spark) 3. Change line 638 in ec2 launch script: git clone your repository in github Hope this can be helpful. Cheers Gen On Tue, Jan 6, 2015 at 11:51 PM, Ganon Pierce ganon.pie...@me.com wrote: Is there a way to use the ec2 launch script with a locally built version of spark? I launch and destroy clusters pretty frequently and would like to not have to wait each time for the master instance to compile the source as happens when I set the -v tag with the latest git commit. To be clear, I would like to launch a non-release version of spark compiled locally as quickly as I can launch a release version (e.g. -v 1.2.0) which does not have to be compiled upon launch. Up to this point, I have just used the launch script included with the latest release to set up the cluster and then manually replaced the assembly file on the master and slaves with the version I built locally and then stored on s3. Is there anything wrong with doing it this way? Further, is there a better or more standard way of accomplishing this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Moving Average
Interesting, I am not sure the order in which fold() encounters elements is guaranteed, although from reading the code, I imagine in practice it is first-to-last by partition and then folded first-to-last from those results on the driver. I don't know this would lead to a solution though as the result here needs to be an RDD, not one value. On Wed, Jan 7, 2015 at 12:10 AM, Paolo Platter paolo.plat...@agilelab.it wrote: In my opinion you should use fold pattern. Obviously after an sort by trasformation. Paolo Inviata dal mio Windows Phone -- Da: Asim Jalis asimja...@gmail.com Inviato: 06/01/2015 23:11 A: Sean Owen so...@cloudera.com Cc: user@spark.apache.org Oggetto: Re: RDD Moving Average One problem with this is that we are creating a lot of iterables containing a lot of repeated data. Is there a way to do this so that we can calculate a moving average incrementally? On Tue, Jan 6, 2015 at 4:44 PM, Sean Owen so...@cloudera.com wrote: Yes, if you break it down to... tickerRDD.map(ticker = (ticker.timestamp, ticker) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey ... as Michael alluded to, then it more naturally extends to the sliding window, since you can flatMap one Ticker to many (bucket, ticker) pairs, then group. I think this would implementing 1 minute buckets, sliding by 10 seconds: tickerRDD.flatMap(ticker = (ticker.timestamp - 6 to ticker.timestamp by 15000).map(ts = (ts, ticker)) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey On Tue, Jan 6, 2015 at 8:47 PM, Asim Jalis asimja...@gmail.com wrote: I guess I can use a similar groupBy approach. Map each event to all the windows that it can belong to. Then do a groupBy, etc. I was wondering if there was a more elegant approach. On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis asimja...@gmail.com wrote: Except I want it to be a sliding window. So the same record could be in multiple buckets.
Re: RDD Moving Average
One approach I was considering was to use mapPartitions. It is straightforward to compute the moving average over a partition, except for near the end point. Does anyone see how to fix that? On Tue, Jan 6, 2015 at 7:20 PM, Sean Owen so...@cloudera.com wrote: Interesting, I am not sure the order in which fold() encounters elements is guaranteed, although from reading the code, I imagine in practice it is first-to-last by partition and then folded first-to-last from those results on the driver. I don't know this would lead to a solution though as the result here needs to be an RDD, not one value. On Wed, Jan 7, 2015 at 12:10 AM, Paolo Platter paolo.plat...@agilelab.it wrote: In my opinion you should use fold pattern. Obviously after an sort by trasformation. Paolo Inviata dal mio Windows Phone -- Da: Asim Jalis asimja...@gmail.com Inviato: 06/01/2015 23:11 A: Sean Owen so...@cloudera.com Cc: user@spark.apache.org Oggetto: Re: RDD Moving Average One problem with this is that we are creating a lot of iterables containing a lot of repeated data. Is there a way to do this so that we can calculate a moving average incrementally? On Tue, Jan 6, 2015 at 4:44 PM, Sean Owen so...@cloudera.com wrote: Yes, if you break it down to... tickerRDD.map(ticker = (ticker.timestamp, ticker) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey ... as Michael alluded to, then it more naturally extends to the sliding window, since you can flatMap one Ticker to many (bucket, ticker) pairs, then group. I think this would implementing 1 minute buckets, sliding by 10 seconds: tickerRDD.flatMap(ticker = (ticker.timestamp - 6 to ticker.timestamp by 15000).map(ts = (ts, ticker)) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey On Tue, Jan 6, 2015 at 8:47 PM, Asim Jalis asimja...@gmail.com wrote: I guess I can use a similar groupBy approach. Map each event to all the windows that it can belong to. Then do a groupBy, etc. I was wondering if there was a more elegant approach. On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis asimja...@gmail.com wrote: Except I want it to be a sliding window. So the same record could be in multiple buckets.
R: RDD Moving Average
In my opinion you should use fold pattern. Obviously after an sort by trasformation. Paolo Inviata dal mio Windows Phone Da: Asim Jalismailto:asimja...@gmail.com Inviato: 06/01/2015 23:11 A: Sean Owenmailto:so...@cloudera.com Cc: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Re: RDD Moving Average One problem with this is that we are creating a lot of iterables containing a lot of repeated data. Is there a way to do this so that we can calculate a moving average incrementally? On Tue, Jan 6, 2015 at 4:44 PM, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: Yes, if you break it down to... tickerRDD.map(ticker = (ticker.timestamp, ticker) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey ... as Michael alluded to, then it more naturally extends to the sliding window, since you can flatMap one Ticker to many (bucket, ticker) pairs, then group. I think this would implementing 1 minute buckets, sliding by 10 seconds: tickerRDD.flatMap(ticker = (ticker.timestamp - 6 to ticker.timestamp by 15000).map(ts = (ts, ticker)) ).map { case(ts, ticker) = ((ts / 6) * 6, ticker) }.groupByKey On Tue, Jan 6, 2015 at 8:47 PM, Asim Jalis asimja...@gmail.commailto:asimja...@gmail.com wrote: I guess I can use a similar groupBy approach. Map each event to all the windows that it can belong to. Then do a groupBy, etc. I was wondering if there was a more elegant approach. On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis asimja...@gmail.commailto:asimja...@gmail.com wrote: Except I want it to be a sliding window. So the same record could be in multiple buckets.
How to replace user.id to user.names in a file
I work on a user to user recommender for a website using mllib.recommendation. I have created a file (recommends.txt) which contains the top 5 recommendations for each user id. The file's form(recommends.txt) is something like this (user::rec1:rec2:rec3:rec4:rec5): /**file's snapshot**/ 5823::944::10030::27::1047::2891::836 14::2097::10030::2427::1874::2018::5804 2726::6557::2776::2959::6619::2018::4466 6367::6557::9359::2726::2542::10159::3574 5804::27::2891::9359::944::1599::1047 1044::944::6557::2542::4737::1866::1874 /**file's snapshot**/ My Question : So I already have the users.sql file which contains the (id,names) and i want to shape recommends.txt to : Bob::Michael::Peter:Steve:Bill::George how can i easily do this? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replace-user-id-to-user-names-in-a-file-tp21006.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Moving Average
I guess I can use a similar groupBy approach. Map each event to all the windows that it can belong to. Then do a groupBy, etc. I was wondering if there was a more elegant approach. On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis asimja...@gmail.com wrote: Except I want it to be a sliding window. So the same record could be in multiple buckets. On Tue, Jan 6, 2015 at 3:43 PM, Sean Owen so...@cloudera.com wrote: So you want windows covering the same length of time, some of which will be fuller than others? You could, for example, simply bucket the data by minute to get this kind of effect. If you an RDD[Ticker], where Ticker has a timestamp in ms, you could: tickerRDD.groupBy(ticker = (ticker.timestamp / 6) * 6)) ... to get an RDD[(Long,Iterable[Ticker])], where the keys are the moment at the start of each minute, and the values are the Tickers within the following minute. You can try variations on this to bucket in different ways. Just be careful because a minute with a huge number of values might cause you to run out of memory. If you're just doing aggregations of some kind there are more efficient methods than this most generic method, like the aggregate methods. On Tue, Jan 6, 2015 at 8:34 PM, Asim Jalis asimja...@gmail.com wrote: Thanks. Another question. I have event data with timestamps. I want to create a sliding window using timestamps. Some windows will have a lot of events in them others won’t. Is there a way to get an RDD made of this kind of a variable length window? On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: First you'd need to sort the RDD to give it a meaningful order, but I assume you have some kind of timestamp in your data you can sort on. I think you might be after the sliding() function, a developer API in MLlib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43 On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis asimja...@gmail.com wrote: Is there an easy way to do a moving average across a single RDD (in a non-streaming app). Here is the use case. I have an RDD made up of stock prices. I want to calculate a moving average using a window size of N. Thanks. Asim
Snappy error when driver is running in JBoss
I get this exception(java.lang.UnsatisfiedLinkError) when the driver is running inside JBoss. We are running with DataStax 4.6 version, which is using spark 1.1.0. The driver runs inside a wildfly container. The snappy-java version is 1.0.5. 2015-01-06 20:25:03,771 ERROR [akka.actor.ActorSystemImpl] (sparkDriver-akka.actor.default-dispatcher-22) Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-3] shutting down ActorSystem [sparkDriver]: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:320) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) [levski-0.5.0-SNAPSHOT-standalone.jar:] at cenx.prometheus.spark.SparkContext.broadcast(Unknown Source) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.actor.ActorCell.invoke(ActorCell.scala:456) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.dispatch.Mailbox.run(Mailbox.scala:219) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [levski-0.5.0-SNAPSHOT-standalone.jar:] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Snappy-error-when-driver-is-running-in-JBoss-tp21004.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.0 and HBase: Snappy UnsatisfiedLinkError
Hi, I am getting this same error. Did you figure out how to solve the problem? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-and-HBase-Snappy-UnsatisfiedLinkError-tp19827p21005.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using ec2 launch script with locally built version of spark?
Is there a way to use the ec2 launch script with a locally built version of spark? I launch and destroy clusters pretty frequently and would like to not have to wait each time for the master instance to compile the source as happens when I set the -v tag with the latest git commit. To be clear, I would like to launch a non-release version of spark compiled locally as quickly as I can launch a release version (e.g. -v 1.2.0) which does not have to be compiled upon launch. Up to this point, I have just used the launch script included with the latest release to set up the cluster and then manually replaced the assembly file on the master and slaves with the version I built locally and then stored on s3. Is there anything wrong with doing it this way? Further, is there a better or more standard way of accomplishing this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Driver hangs on running mllib word2vec
Two billion words is a very large vocabulary… You can try solving this issue by by setting the number of times words must occur in order to be included in the vocabulary using setMinCount, this will be prevent common misspellings, websites, and other things from being included and may improve the quality of your model overall. On Jan 6, 2015, at 12:59 AM, Eric Zhen zhpeng...@gmail.com wrote: Thanks Zhan, I'm also confused about the jstack output, why the driver gets stuck at org.apache.spark.SparkContext.clean ? On Tue, Jan 6, 2015 at 2:10 PM, Zhan Zhang zzh...@hortonworks.com mailto:zzh...@hortonworks.com wrote: I think it is overflow. The training data is quite big. The algorithms scalability highly depends on the vocabSize. Even without overflow, there are still other bottlenecks, for example, syn0Global and syn1Global, each of them has vocabSize * vectorSize elements. Thanks. Zhan Zhang On Jan 5, 2015, at 7:47 PM, Eric Zhen zhpeng...@gmail.com mailto:zhpeng...@gmail.com wrote: Hi Xiangrui, Our dataset is about 80GB(10B lines). In the driver's log, we foud this: INFO Word2Vec: trainWordsCount = -1610413239 it seems that there is a integer overflow? On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng men...@gmail.com mailto:men...@gmail.com wrote: How big is your dataset, and what is the vocabulary size? -Xiangrui On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com mailto:zhpeng...@gmail.com wrote: Hi, When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup usage. Here is the jstack output: main prio=10 tid=0x40112800 nid=0x46f2 runnable [0x4162e000] java.lang.Thread.State: RUNNABLE at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) at java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) at org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) at com.baidu.inf.WordCount$.main(WordCount.scala:31) at com.baidu.inf.WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: Driver hangs on running mllib word2vec
Oops, just kidding, this method is not in the current release. However, it is included in the latest commit on git if you want to do a build. On Jan 6, 2015, at 2:56 PM, Ganon Pierce ganon.pie...@me.com wrote: Two billion words is a very large vocabulary… You can try solving this issue by by setting the number of times words must occur in order to be included in the vocabulary using setMinCount, this will be prevent common misspellings, websites, and other things from being included and may improve the quality of your model overall. On Jan 6, 2015, at 12:59 AM, Eric Zhen zhpeng...@gmail.com mailto:zhpeng...@gmail.com wrote: Thanks Zhan, I'm also confused about the jstack output, why the driver gets stuck at org.apache.spark.SparkContext.clean ? On Tue, Jan 6, 2015 at 2:10 PM, Zhan Zhang zzh...@hortonworks.com mailto:zzh...@hortonworks.com wrote: I think it is overflow. The training data is quite big. The algorithms scalability highly depends on the vocabSize. Even without overflow, there are still other bottlenecks, for example, syn0Global and syn1Global, each of them has vocabSize * vectorSize elements. Thanks. Zhan Zhang On Jan 5, 2015, at 7:47 PM, Eric Zhen zhpeng...@gmail.com mailto:zhpeng...@gmail.com wrote: Hi Xiangrui, Our dataset is about 80GB(10B lines). In the driver's log, we foud this: INFO Word2Vec: trainWordsCount = -1610413239 it seems that there is a integer overflow? On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng men...@gmail.com mailto:men...@gmail.com wrote: How big is your dataset, and what is the vocabulary size? -Xiangrui On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com mailto:zhpeng...@gmail.com wrote: Hi, When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup usage. Here is the jstack output: main prio=10 tid=0x40112800 nid=0x46f2 runnable [0x4162e000] java.lang.Thread.State: RUNNABLE at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778) at java.io.DataOutputStream.writeInt(DataOutputStream.java:182) at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225) at java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610) at org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290) at com.baidu.inf.WordCount$.main(WordCount.scala:31) at com.baidu.inf.WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at
Re: Data Locality
You can also read about locality here in the docs: http://spark.apache.org/docs/latest/tuning.html#data-locality On Tue, Jan 6, 2015 at 8:37 AM, Cody Koeninger c...@koeninger.org wrote: No, not all rdds have location information, and in any case tasks may be scheduled on non-local nodes if there is idle capacity. see spark.locality.wait http://spark.apache.org/docs/latest/configuration.html On Tue, Jan 6, 2015 at 10:17 AM, gtinside gtins...@gmail.com wrote: Does spark guarantee to push the processing to the data ? Before creating tasks does spark always check for data location ? So for example if I have 3 spark nodes (Node1, Node2, Node3) and data is local to just 2 nodes (Node1 and Node2) , will spark always schedule tasks on the node for which the data is local ie Node1 and Node 2(assuming Node1 and Node2 have enough resources to execute the tasks)? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-Locality-tp21000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Current Build Gives HTTP ERROR
I’m attempting to build from the latest commit on git and receive the following error upon attempting to access the application web ui: HTTP ERROR: 500 Problem accessing /jobs/. Reason: Server Error Powered by Jetty:// My driver also prints this error: java.lang.UnsupportedOperationException: empty.max at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216) at scala.collection.AbstractTraversable.max(Traversable.scala:105) at org.apache.spark.ui.jobs.AllJobsPage.org$apache$spark$ui$jobs$AllJobsPage$$makeRow$1(AllJobsPage.scala:46) at org.apache.spark.ui.jobs.AllJobsPage$$anonfun$jobsTable$1.apply(AllJobsPage.scala:91) at org.apache.spark.ui.jobs.AllJobsPage$$anonfun$jobsTable$1.apply(AllJobsPage.scala:91) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.ui.jobs.AllJobsPage.jobsTable(AllJobsPage.scala:91) at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:106) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:68) at javax.servlet.http.HttpServlet.service(HttpServlet.java:735) at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) Has the ui been disabled intentionally for development purposes, have I not set something up correctly, or is this a bug?
Re: Launching Spark app in client mode for standalone cluster
Thanks for the pointers. The issue was due to route caching by Spray, which would always return the same value. Other than that the program is working fine. On Mon, Jan 5, 2015 at 12:44 AM, Simon Chan simonc...@gmail.com wrote: Boromir, You may like to take a look at how we make Spray and Spark working together at the PredictionIO project: https://github.com/PredictionIO/PredictionIO Simon On Sun, Jan 4, 2015 at 8:31 PM, Chester At Work ches...@alpinenow.com wrote: Just a guess here, may not be correct. Spray needs to start akka actor system; spark context also creates an akka actor system, is it possible there are some conflict ? Sent from my iPad On Jan 4, 2015, at 7:42 PM, Boromir Widas vcsub...@gmail.com wrote: Hello, I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server, using the following code. When I run it as $ java -cp class paths SprayServer the SimpleApp.getA() call from SprayService returns -1(which means it sees the logData RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct values from SimpleApp.getA(). Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug this for some time but not getting anywhere - any pointers will be greatly appreciated. Thanks. BEGIN SPRAY SERVER import akka.actor.{ActorSystem, Props} import akka.io.IO import spray.can.Http import akka.actor._ import spray.routing.HttpService import scala.concurrent.ops object SprayServer { def main(args: Array[String]) { // we need an ActorSystem to host our service implicit val system = ActorSystem() //create our service actor val service = system.actorOf(Props[SprayServiceActor], test-service) //bind our actor to an HTTP port IO(Http) ! Http.Bind(service, interface = 0.0.0.0, port = 8085) ops.spawn { *SimpleAppLoader.run() * } } } class SprayServiceActor extends SprayService with Actor { // the HttpService trait (which SprayService will extend) defines // only one abstract member, which connects the services environment // to the enclosing actor or test. def actorRefFactory = context def receive = runRoute(rootRoute) } trait SprayService extends HttpService { def default = path() { println(handling default route) val numAs = *SimpleApp.getA() // DOES NOT WORK * get { complete(snum A: $numAs) } } def pingRoute = path(ping) { get { complete(pong!) } } def pongRoute = path(pong) { get { complete(pong!?) } } def rootRoute = pingRoute ~ pongRoute ~ default } // END SPRAY, BEGIN SPARK import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmit import org.apache.spark.rdd.RDD object SimpleApp { var resultString: String = Data not assigned var logData: RDD[String] = null def main(args: Array[String]) { val logFile = /home/ovik/src/spark/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() resultString = Lines with a: %s, Lines with b: %s.format(numAs, numBs) println(resultString) } def getA(): Int = { println(resultString) if(null == logData) { println( logData is null!) -1 } else { val numAs = logData.filter(line = line.contains(a)).count().toInt println(s numAs: $numAs) numAs } } } object SimpleAppLoader { def main(args: Array[String]) { run() } def run() { val clArgs = Array( --deploy-mode, client , --total-executor-cores, 2 , --class, SimpleApp , --conf, spark.shuffle.spill=false , --conf, spark.master=spark://troika:7077 , --conf, spark.driver.memory=128m , --conf, spark.executor.memory=128m , --conf, spark.eventLog.enabled=true , --conf, spark.eventLog.dir=/home/ovik/logs , SparkContext.jarOfClass(this.getClass).get) SparkSubmit.main(clArgs) val numAs = *SimpleApp.getA()// WORKS * println(snumAs is $numAs) } }
Re: Current Build Gives HTTP ERROR
FWIW I do not see any such error, after a mvn -DskipTests clean package and ./bin/spark-shell from master. Maybe double-check you have done a full clean build. On Tue, Jan 6, 2015 at 9:09 PM, Ganon Pierce ganon.pie...@me.com wrote: I’m attempting to build from the latest commit on git and receive the following error upon attempting to access the application web ui: HTTP ERROR: 500 Problem accessing /jobs/. Reason: Server Error *Powered by Jetty://* My driver also prints this error: java.lang.UnsupportedOperationException: empty.max at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216) at scala.collection.AbstractTraversable.max(Traversable.scala:105) at org.apache.spark.ui.jobs.AllJobsPage.org $apache$spark$ui$jobs$AllJobsPage$$makeRow$1(AllJobsPage.scala:46) at org.apache.spark.ui.jobs.AllJobsPage$$anonfun$jobsTable$1.apply(AllJobsPage.scala:91) at org.apache.spark.ui.jobs.AllJobsPage$$anonfun$jobsTable$1.apply(AllJobsPage.scala:91) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.ui.jobs.AllJobsPage.jobsTable(AllJobsPage.scala:91) at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:106) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:68) at javax.servlet.http.HttpServlet.service(HttpServlet.java:735) at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745) Has the ui been disabled intentionally for development purposes, have I not set something up correctly, or is this a bug?
[MLLib] storageLevel in ALS
Hi, I was doing a tests with ALS and I noticed that if I persist the inner RDDs from a MatrixFactorizationModel the RDD is not replicated, it seems like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it makes sense to make that configurable? [image: Inline image 1]
Re: Snappy error when driver is running in JBoss
Might be due to conflict between multiple snappy jars. Can you check the classpath to see if there are more than one snappy jar ? Cheers On Tue, Jan 6, 2015 at 2:26 PM, Charles charles...@cenx.com wrote: I get this exception(java.lang.UnsatisfiedLinkError) when the driver is running inside JBoss. We are running with DataStax 4.6 version, which is using spark 1.1.0. The driver runs inside a wildfly container. The snappy-java version is 1.0.5. 2015-01-06 20:25:03,771 ERROR [akka.actor.ActorSystemImpl] (sparkDriver-akka.actor.default-dispatcher-22) Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-3] shutting down ActorSystem [sparkDriver]: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:320) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) [levski-0.5.0-SNAPSHOT-standalone.jar:] at cenx.prometheus.spark.SparkContext.broadcast(Unknown Source) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753) [levski-0.5.0-SNAPSHOT-standalone.jar:] at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.actor.ActorCell.invoke(ActorCell.scala:456) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.dispatch.Mailbox.run(Mailbox.scala:219) [levski-0.5.0-SNAPSHOT-standalone.jar:] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [levski-0.5.0-SNAPSHOT-standalone.jar:] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [levski-0.5.0-SNAPSHOT-standalone.jar:] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Snappy-error-when-driver-is-running-in-JBoss-tp21004.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet schema changes
Anyone got any further thoughts on this? I saw the _metadata file seems to store the schema of every single part (i.e. file) in the parquet directory, so in theory it should be possible. Effectively, our use case is that we have a stack of JSON that we receive and we want to encode to Parquet for high performance, but there is potential of new fields being added to the JSON structure, so we want to be able to handle that every time we encode to Parquet (we'll be doing it incrementally for performance). On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore dragoncu...@gmail.com wrote: I saw that in the source, which is why I was wondering. I was mainly reading: http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/ A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2. I know that each part file can have its own schema, but I saw in the implementation for Spark, if there was no metadata file, it'd just pick the first file and use that schema across the board. I'm not quite sure how other implementations like Impala etc. deal with this, but I was really hoping there'd be a way to version the schema as new records are added and just project it through. Would be a godsend for semi-structured data. On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian lian.cs@gmail.com wrote: I must missed something important here, could you please provide more clue on Parquet “schema versioning”? I wasn’t aware of this feature (which sounds really useful). Especially, are you referring the following scenario: 1. Write some data whose schema is A to “t.parquet”, resulting a file “t.parquet/parquet-r-1.part” on HDFS 2. Append more data whose schema B “contains” A, but has more columns to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part” on HDFS 3. Now read “t.parquet”, and schema A and B are expected to be merged If this is the case, then current Spark SQL doesn’t support this. We assume schemas of all data within a single Parquet file (which is an HDFS directory with multiple part-files) are identical. On 12/22/14 1:11 PM, Adam Gilmore wrote: Hi all, I understand that parquet allows for schema versioning automatically in the format; however, I'm not sure whether Spark supports this. I'm saving a SchemaRDD to a parquet file, registering it as a table, then doing an insertInto with a SchemaRDD with an extra column. The second SchemaRDD does in fact get inserted, but the extra column isn't present when I try to query it with Spark SQL. Is there anything I can do to get this working how I'm hoping?
Re: Re: I think I am almost lost in the internals of Spark
I do not understand Chinese but the diagrams on that page are very helpful. On Tue, Jan 6, 2015 at 9:46 PM, eric wong win19...@gmail.com wrote: A good beginning if you are chinese. https://github.com/JerryLead/SparkInternals/tree/master/markdown 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com: Thank you, Tobias. I will look into the Spark paper. But it looks that the paper has been moved, http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf. A web page is returned (Resource not found)when I access it. -- bit1...@163.com *From:* Tobias Pfeiffer t...@preferred.jp *Date:* 2015-01-07 09:24 *To:* Todd bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: I think I am almost lost in the internals of Spark Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias -- 王海华
Re: Cannot see RDDs in Spark UI
Hi Manoj, I've noticed that the storage tab only shows RDDs that have been cached. Did you call .cache() or .persist() on any of the RDDs? Andrew On Tue, Jan 6, 2015 at 6:48 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, I create a bunch of RDDs, including schema RDDs. When I run the program and go to UI on xxx:4040, the storage tab does not shows any RDDs. Spark version is 1.1.1 (Hadoop 2.3) Any thoughts? Thanks,
Re: I think I am almost lost in the internals of Spark
Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias
Re: Re: I think I am almost lost in the internals of Spark
A good beginning if you are chinese. https://github.com/JerryLead/SparkInternals/tree/master/markdown 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com: Thank you, Tobias. I will look into the Spark paper. But it looks that the paper has been moved, http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf. A web page is returned (Resource not found)when I access it. -- bit1...@163.com *From:* Tobias Pfeiffer t...@preferred.jp *Date:* 2015-01-07 09:24 *To:* Todd bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: I think I am almost lost in the internals of Spark Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias -- 王海华
Re: Shuffle Problems in 1.2.0
Hey Davies, Here are some more details on a configuration that causes this error for me. Launch an AWS Spark EMR cluster as follows: *aws emr create-cluster --region us-west-1 --no-auto-terminate \ --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args='[-g]' \ --ami-version 3.3 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name Spark Issue Repro \--visible-to-all-users --applications Name=Ganglia* This is a 10 node cluster (not sure if this makes a difference outside of HDFS block locality). Then use this Gist here as your spark-defaults file (it'll configure 2 executors per job as well): https://gist.github.com/skrasser/9b978d3d572735298d16 With that, I am seeing this again: 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in stage 0.0 (TID 27) org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@4cfae71c) Thanks for the performance pointers -- the repro script is fairly unpolished (just enough to cause the aforementioned exception). Hope this sheds some light on the error. From what I can tell so far, something in the spark-defaults file triggers it (with other settings it completes just fine). Thanks for your help! -Sven On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu dav...@databricks.com wrote: I still can not reproduce it with 2 nodes (4 CPUs). Your repro.py could be faster (10 min) than before (22 min): inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): pc==3).collect() (also, no cache needed anymore) Davies On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote: The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com wrote: I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to
Re: Parquet schema changes
I want to support this but we don't yet. Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-3851 On Tue, Jan 6, 2015 at 5:23 PM, Adam Gilmore dragoncu...@gmail.com wrote: Anyone got any further thoughts on this? I saw the _metadata file seems to store the schema of every single part (i.e. file) in the parquet directory, so in theory it should be possible. Effectively, our use case is that we have a stack of JSON that we receive and we want to encode to Parquet for high performance, but there is potential of new fields being added to the JSON structure, so we want to be able to handle that every time we encode to Parquet (we'll be doing it incrementally for performance). On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore dragoncu...@gmail.com wrote: I saw that in the source, which is why I was wondering. I was mainly reading: http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/ A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2. I know that each part file can have its own schema, but I saw in the implementation for Spark, if there was no metadata file, it'd just pick the first file and use that schema across the board. I'm not quite sure how other implementations like Impala etc. deal with this, but I was really hoping there'd be a way to version the schema as new records are added and just project it through. Would be a godsend for semi-structured data. On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian lian.cs@gmail.com wrote: I must missed something important here, could you please provide more clue on Parquet “schema versioning”? I wasn’t aware of this feature (which sounds really useful). Especially, are you referring the following scenario: 1. Write some data whose schema is A to “t.parquet”, resulting a file “t.parquet/parquet-r-1.part” on HDFS 2. Append more data whose schema B “contains” A, but has more columns to “t.parquet”, resulting another file “t.parquet/parquet-r-2.part” on HDFS 3. Now read “t.parquet”, and schema A and B are expected to be merged If this is the case, then current Spark SQL doesn’t support this. We assume schemas of all data within a single Parquet file (which is an HDFS directory with multiple part-files) are identical. On 12/22/14 1:11 PM, Adam Gilmore wrote: Hi all, I understand that parquet allows for schema versioning automatically in the format; however, I'm not sure whether Spark supports this. I'm saving a SchemaRDD to a parquet file, registering it as a table, then doing an insertInto with a SchemaRDD with an extra column. The second SchemaRDD does in fact get inserted, but the extra column isn't present when I try to query it with Spark SQL. Is there anything I can do to get this working how I'm hoping?
Re: Re: I think I am almost lost in the internals of Spark
Thanks Eric. Yes..I am Chinese, :-). I will read through the articles, thank you! bit1...@163.com From: eric wong Date: 2015-01-07 10:46 To: bit1...@163.com CC: user Subject: Re: Re: I think I am almost lost in the internals of Spark A good beginning if you are chinese. https://github.com/JerryLead/SparkInternals/tree/master/markdown 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com: Thank you, Tobias. I will look into the Spark paper. But it looks that the paper has been moved, http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf. A web page is returned (Resource not found)when I access it. bit1...@163.com From: Tobias Pfeiffer Date: 2015-01-07 09:24 To: Todd CC: user Subject: Re: I think I am almost lost in the internals of Spark Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias -- 王海华
SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?
Hi, I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…). Using the latest release 1.2.0. Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns) on a 7 node cluster. val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”) t.registerTempTable(test1”) sqlC.cacheTable(test1”) Now lets do some operations on it; I want the total sales quantities sold for each hour in the day so I choose 3 out of the 10 possible columns... sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour).collect().foreach(println) After the table has been 100% cached in memory, this takes around 11 seconds. Lets do the same thing but via a MapPartitions call (this isn’t production ready code but gets the job done). val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”) rddPC.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.getInt(0) qtySum(hr) += r.getDouble(1) salesSum(hr) += r.getDouble(2) } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) Now this takes around ~49 seconds… Even though test1 table is 100% cached. The number of partitions remains the same… Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, sales: Double) Convert the SchemaRDD; val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache() //cache all the data rdd.count() Then run basically the same MapPartitions query; rdd.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.hour qtySum(hr) += r.qty salesSum(hr) += r.sales } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) This takes around 1.5 seconds! Albeit the memory footprint is much larger. My thinking is that because SparkSQL does store things in a columnar format, there is some unwrapping to be done out of the column array buffers which takes time and for some reason this just takes longer when I switch out to map partitions (maybe its unwrapping the entire row, even though I’m using just a subset of columns, or maybe there is some object creation/autoboxing going on when calling getInt or getDouble)… I’ve tried simpler cases too, like just summing sales. Running sum via SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster (2.6 seconds). But MapPartitions on the SchemaRDD; sqlC.sql(select SalesInclGST from test1).mapPartitions(iter = Iterator(iter.foldLeft(0.0)((t,r) = t+r.getDouble(0.sum takes a long time (33 seconds). In all these examples everything is fully cached in memory. And yes for these kinds of operations I can use SQL, but for more complex queries I’d much rather be using a combo of SparkSQL to select the data (so I get nice things like Parquet pushdowns etc.) functional Scala! I think I’m doing something dumb… Is there something I should be doing to get faster performance on MapPartitions on SchemaRDDs? Is there some unwrapping going on in the background that catalyst does in a smart way that I’m missing? Cheers, ~N Nathan McCarthy QUANTIUM Level 25, 8 Chifley, 8-12 Chifley Square Sydney NSW 2000 T: +61 2 8224 8922 F: +61 2 9292 6444 W: quantium.com.auwww.quantium.com.au linkedin.com/company/quantiumwww.linkedin.com/company/quantium facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia twitter.com/QuantiumAUwww.twitter.com/QuantiumAU The contents of this email, including attachments, may be confidential information. If you are not the intended recipient, any use, disclosure or copying of the information is unauthorised. If you have received this email in error, we would be grateful if you would notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your system.
Re: Re: I think I am almost lost in the internals of Spark
Interestingly Google Chrome translates the materials. Cheers k/ On Tue, Jan 6, 2015 at 7:26 PM, Boromir Widas vcsub...@gmail.com wrote: I do not understand Chinese but the diagrams on that page are very helpful. On Tue, Jan 6, 2015 at 9:46 PM, eric wong win19...@gmail.com wrote: A good beginning if you are chinese. https://github.com/JerryLead/SparkInternals/tree/master/markdown 2015-01-07 10:13 GMT+08:00 bit1...@163.com bit1...@163.com: Thank you, Tobias. I will look into the Spark paper. But it looks that the paper has been moved, http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf. A web page is returned (Resource not found)when I access it. -- bit1...@163.com *From:* Tobias Pfeiffer t...@preferred.jp *Date:* 2015-01-07 09:24 *To:* Todd bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: I think I am almost lost in the internals of Spark Hi, On Tue, Jan 6, 2015 at 11:24 PM, Todd bit1...@163.com wrote: I am a bit new to Spark, except that I tried simple things like word count, and the examples given in the spark sql programming guide. Now, I am investigating the internals of Spark, but I think I am almost lost, because I could not grasp a whole picture what spark does when it executes the word count. I recommend understanding what an RDD is and how it is processed, using http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds and probably also http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf (once the server is back). Understanding how an RDD is processed is probably most helpful to understand the whole of Spark. Tobias -- 王海华
Re: How to replace user.id to user.names in a file
Hi, it looks to me as if you need the whole user database on every node, so maybe put the id-name information as a Map[Id, String] in a broadcast variable and then do something like recommendations.map(line = { line.map(uid = usernames(uid)) }) or so? Tobias
Re: How to replace user.id to user.names in a file
Hi, On Wed, Jan 7, 2015 at 10:47 AM, Riginos Samaras samarasrigi...@gmail.com wrote: Yes something like this. Can you please give me an example to create a Map? That depends heavily on the shape of your input file. What about something like: (for (line - Source.fromFile(filename).getLines()) { val items = line.trim.split( ) (items(0).toInt, items(1)) }).toMap Tobias
Re: How to replace user.id to user.names in a file
Hi, On Wed, Jan 7, 2015 at 11:13 AM, Riginos Samaras samarasrigi...@gmail.com wrote: exactly thats what I'm looking for, my code is like this: //code val users_map = users_file.map{ s = val parts = s.split(,) (parts(0).toInt, parts(1)) }.distinct //code but i get the error: error: value toMap is not a member of org.apache.spark.rdd.RDD[(Int, String)] user_map.toMap If you want to distribute the Map as a broadcast variable, it must not be an RDD but a normal Scala map. Make your users_file a regular List, then it should work. Tobias
Re: different akka versions and spark
if the classes are in the original location than i think its safe to say that this makes it impossible for us to build one app that can run against spark 1.0.x, 1.1.x and spark 1.2.x. thats no big deal, but it does beg the question of what compatibility can reasonably be expected for spark 1.x series. i have seen a lot of focus on backwards compatibility of the spark 1.x api, but to me thats kind of a moot point if i cannot run apps against all 1.x versions anyhow since dependencies are not compatible. On Mon, Jan 5, 2015 at 5:08 PM, Marcelo Vanzin van...@cloudera.com wrote: Spark doesn't really shade akka; it pulls a different build (kept under the org.spark-project.akka group and, I assume, with some build-time differences from upstream akka?), but all classes are still in the original location. The upgrade is a little more unfortunate than just changing akka, since it also changes some transitive dependencies which also have compatibility issues (e.g. the typesafe config library). But I believe it's needed to support Scala 2.11... On Mon, Jan 5, 2015 at 8:27 AM, Koert Kuipers ko...@tresata.com wrote: since spark shaded akka i wonder if it would work, but i doubt it On Mon, Jan 5, 2015 at 9:56 AM, Cody Koeninger c...@koeninger.org wrote: I haven't tried it with spark specifically, but I've definitely run into problems trying to depend on multiple versions of akka in one project. On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com wrote: hey Ted, i am aware of the upgrade efforts for akka. however if spark 1.2 forces me to upgrade all our usage of akka to 2.3.x while spark 1.0 and 1.1 force me to use akka 2.2.x then we cannot build one application that runs on all spark 1.x versions, which i would consider a major incompatibility. best, koert On Sat, Jan 3, 2015 at 12:11 AM, Ted Yu yuzhih...@gmail.com wrote: Please see http://akka.io/news/2014/05/22/akka-2.3.3-released.html which points to http://doc.akka.io/docs/akka/2.3.3/project/migration-guide-2.2.x-2.3.x.html?_ga=1.35212129.1385865413.1420220234 Cheers On Fri, Jan 2, 2015 at 9:11 AM, Koert Kuipers ko...@tresata.com wrote: i noticed spark 1.2.0 bumps the akka version. since spark uses it's own akka version, does this mean it can co-exist with another akka version in the same JVM? has anyone tried this? we have some spark apps that also use akka (2.2.3) and spray. if different akka versions causes conflicts then spark 1.2.0 would not be backwards compatible for us... thanks. koert -- Marcelo
Re: Spark Driver behind NAT
From what I can tell, this isn't a firewall issue per se..it's how the Remoting Service binds to an IP given cmd line parameters. So, if I have a VM (or OpenStack or EC2 instance) running on a private network let's say, where the IP address is 192.168.X.Y...I can't tell the Workers to reach me on this IP. Because the Remoting Service binds to the interface passed in those parameters. So, if my public IP is a routable IP address...but the one the VM sees is the 192.168.X.Y address..it appears I can't do some kinda of port forwarding from the external to the internal...is this correct? If I set spark.driver.host and spark.driver.port properties at the command line..it tries to actually bind to that IP..rather than just telling the worker..reach back to this IP. Is there a way around this? Is there a way to tell the workers which IP address to use..WITHOUT, binding to it maybe? Maybe allow the Remoting Service to bind to the internal IP..but, advertise it differently? On Mon, Jan 5, 2015 at 9:02 AM, Aaron aarongm...@gmail.com wrote: Thanks for the link! However, from reviewing the thread, it appears you cannot have a NAT/firewall between the cluster and the spark-driver/shell..is this correct? When the shell starts up, it binds to the internal IP (e.g. 192.168.x.y)..not the external floating IP..which is routable from the cluster. When i did set a static port for the spark.driver.port and set the spark.driver.host to the floating IP address...I get the same exception, (Caused by: java.net.BindException: Cannot assign requested address: bind), because of the use of the InetAddress.getHostAddress method call. Cheers, Aaron On Mon, Jan 5, 2015 at 8:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can have a look at this discussion http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html Thanks Best Regards On Mon, Jan 5, 2015 at 6:11 PM, Aaron aarongm...@gmail.com wrote: Hello there, I was wondering if there is a way to have the spark-shell (or pyspark) sit behind a NAT when talking to the cluster? Basically, we have OpenStack instances that run with internal IPs, and we assign floating IPs as needed. Since the workers make direct TCP connections back, the spark-shell is binding to the internal IP..not the floating. Our other use case is running Vagrant VMs on our local machines..but, we don't have those VMs' NICs setup in bridged mode..it too has an internal IP. I tried using the SPARK_LOCAL_IP, and the various --conf spark.driver.host parameters...but it still get's angry. Any thoughts/suggestions? Currently our work around is to VPNC connection from inside the vagrant VMs or Openstack instances...but, that doesn't seem like a long term plan. Thanks in advance! Cheers, Aaron
Re: Spark Driver behind NAT
Found the issue in JIRA: https://issues.apache.org/jira/browse/SPARK-4389?jql=project%20%3D%20SPARK%20AND%20text%20~%20NAT On Tue, Jan 6, 2015 at 10:45 AM, Aaron aarongm...@gmail.com wrote: From what I can tell, this isn't a firewall issue per se..it's how the Remoting Service binds to an IP given cmd line parameters. So, if I have a VM (or OpenStack or EC2 instance) running on a private network let's say, where the IP address is 192.168.X.Y...I can't tell the Workers to reach me on this IP. Because the Remoting Service binds to the interface passed in those parameters. So, if my public IP is a routable IP address...but the one the VM sees is the 192.168.X.Y address..it appears I can't do some kinda of port forwarding from the external to the internal...is this correct? If I set spark.driver.host and spark.driver.port properties at the command line..it tries to actually bind to that IP..rather than just telling the worker..reach back to this IP. Is there a way around this? Is there a way to tell the workers which IP address to use..WITHOUT, binding to it maybe? Maybe allow the Remoting Service to bind to the internal IP..but, advertise it differently? On Mon, Jan 5, 2015 at 9:02 AM, Aaron aarongm...@gmail.com wrote: Thanks for the link! However, from reviewing the thread, it appears you cannot have a NAT/firewall between the cluster and the spark-driver/shell..is this correct? When the shell starts up, it binds to the internal IP (e.g. 192.168.x.y)..not the external floating IP..which is routable from the cluster. When i did set a static port for the spark.driver.port and set the spark.driver.host to the floating IP address...I get the same exception, (Caused by: java.net.BindException: Cannot assign requested address: bind), because of the use of the InetAddress.getHostAddress method call. Cheers, Aaron On Mon, Jan 5, 2015 at 8:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can have a look at this discussion http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html Thanks Best Regards On Mon, Jan 5, 2015 at 6:11 PM, Aaron aarongm...@gmail.com wrote: Hello there, I was wondering if there is a way to have the spark-shell (or pyspark) sit behind a NAT when talking to the cluster? Basically, we have OpenStack instances that run with internal IPs, and we assign floating IPs as needed. Since the workers make direct TCP connections back, the spark-shell is binding to the internal IP..not the floating. Our other use case is running Vagrant VMs on our local machines..but, we don't have those VMs' NICs setup in bridged mode..it too has an internal IP. I tried using the SPARK_LOCAL_IP, and the various --conf spark.driver.host parameters...but it still get's angry. Any thoughts/suggestions? Currently our work around is to VPNC connection from inside the vagrant VMs or Openstack instances...but, that doesn't seem like a long term plan. Thanks in advance! Cheers, Aaron
Re: MLLIB and Openblas library in non-default dir
spark-submit may not share the same JVM with Spark master and executors. On Tue, Jan 6, 2015 at 11:40 AM, Tomas Hudik xhu...@gmail.com wrote: thanks Xiangrui I'll try it. BTW: spark-submit is a standalone program (bin/spark-submit). Therefore, JVM has to be executed after spark-submit script Am I correct? On Mon, Jan 5, 2015 at 10:35 PM, Xiangrui Meng men...@gmail.com wrote: It might be hard to do that with spark-submit, because the executor JVMs may be already up and running before a user runs spark-submit. You can try to use `System.setProperty` to change the property at runtime, though it doesn't seem to be a good solution. -Xiangrui On Fri, Jan 2, 2015 at 6:28 AM, xhudik xhu...@gmail.com wrote: Hi I have compiled OpenBlas library into nonstandard directory and I want to inform Spark app about it via: -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so which is a standard option in netlib-java (https://github.com/fommil/netlib-java) I tried 2 ways: 1. via *--conf* parameter /bin/spark-submit -v --class org.apache.spark.examples.mllib.LinearRegression *--conf -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so* examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop1.0.4.jar data/mllib/sample_libsvm_data.txt/ 2. via *--driver-java-options* parameter /bin/spark-submit -v *--driver-java-options -Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/local/lib/libopenblas.so* --class org.apache.spark.examples.mllib.LinearRegression examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop1.0.4.jar data/mllib/sample_libsvm_data.txt / How can I force spark-submit to propagate info about non-standard placement of openblas library to netlib-java lib? thanks, Tomas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-and-Openblas-library-in-non-default-dir-tp20943.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [MLLib] storageLevel in ALS
Which Spark version are you using? We made this configurable in 1.1: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202 -Xiangrui On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote: Hi, I was doing a tests with ALS and I noticed that if I persist the inner RDDs from a MatrixFactorizationModel the RDD is not replicated, it seems like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it makes sense to make that configurable? [image: Inline image 1]
Re: Using graphx to calculate average distance of a big graph
We are going to estimate the average distance using [HyperAnf]( http://arxiv.org/abs/1011.5599) on a 100 billion edge graph. 2015-01-07 2:18 GMT+08:00 Ankur Dave ankurd...@gmail.com: [-dev] What size of graph are you hoping to run this on? For small graphs where materializing the all-pairs shortest path is an option, you could simply find the APSP using https://github.com/apache/spark/pull/3619 and then take the average distance (apsp.map(_._2.toDouble).mean). Ankur http://www.ankurdave.com/ On Sun, Jan 4, 2015 at 6:28 PM, James alcaid1...@gmail.com wrote: Recently we want to use spark to calculate the average shortest path distance between each reachable pair of nodes in a very big graph. Is there any one ever try this? We hope to discuss about the problem.
Re: Spark SQL implementation error
As per telephonic call see how we can fetch the count val tweetsCount = sql(SELECT COUNT(*) FROM tweets) println(f\n\n\nThere are ${tweetsCount.collect.head.getLong(0)} Tweets on this Dataset\n\n) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-implementation-error-tp20901p21008.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why Parquet Predicate Pushdown doesn't work?
Quoting Michael: Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration Daniel On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote: Hi, I'm testing parquet file format, and the predicate pushdown is a very useful feature for us. However, it looks like the predicate push down doesn't work after I set sqlContext.sql(SET spark.sql.parquet.filterPushdown=true) Here is my sql: sqlContext.sql(select adId, adTitle from ad where groupId=10113000).collect Then, I checked the amount of input data on the WEB UI. But the amount of input data is ALWAYS 80.2M regardless whether I turn the spark.sql.parquet.filterPushdown flag on or off. I'm not sure, if there is anything that I must do when generating the parquet file in order to make the predicate pushdown available. (Like ORC file, when creating the ORC file, I need to explicitly sort the field that will be used for predicate pushdown) Anyone have any idea? And, anyone knows the internal mechanism for parquet predicate pushdown? Thanks
Re: Problem getting Spark running on a Yarn cluster
Just follow this documentation http://spark.apache.org/docs/1.1.1/running-on-yarn.html Ensure that *HADOOP_CONF_DIR* or *YARN_CONF_DIR* points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to the dfs and connect to the YARN ResourceManager. Mostly you have wrong configuration in the environment and that's why its connecting to the *localhost* (127.0.1.1) Thanks Best Regards On Tue, Jan 6, 2015 at 8:10 PM, Sharon Rapoport sha...@plaid.com wrote: Hello, We have hadoop 2.6.0 and Yarn set up on ec2. Trying to get spark 1.1.1 running on the Yarn cluster. I have of course googled around and found that this problem is solved for most after removing the line including 127.0.1.1 from /etc/hosts. This hasn’t seemed to solve this for me. Anyone has an idea where else might 127.0.1.1 be hiding in some conf? Looked everywhere… or is there a completely different problem? Thanks, Sharon I am getting this error: WARN network.SendingConnection: Error finishing connection to / 127.0.1.1:47020 java.net.ConnectException: Connection refused
Re: confidence/probability for prediction in MLlib
This is addressed in https://issues.apache.org/jira/browse/SPARK-4789. In the new pipeline API, we can simply output two columns, one for the best predicted class, and the other for probabilities or confidence scores for each class. -Xiangrui On Tue, Jan 6, 2015 at 11:43 AM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, A while ago, somebody asked about getting a confidence value of a prediction with MLlib's implementation of Naive Bayes's classification. I was wondering if there is any plan in the near future for the predict function to return both a label and a confidence/probability? Or could the private variables in the various machine learning models be exposed so we could write our own functions which return both? Having a confidence/probability could be very useful in real application. For one thing, you can choose to trust the predicted label only if it has a high confidence level. Also, if you want to combine the results from multiple classifiers, the confidence/probability could be used as some kind of weight for combining. Thanks, Jianguo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why Parquet Predicate Pushdown doesn't work?
Hi, I'm testing parquet file format, and the predicate pushdown is a very useful feature for us. However, it looks like the predicate push down doesn't work after I set sqlContext.sql(SET spark.sql.parquet.filterPushdown=true) Here is my sql: sqlContext.sql(select adId, adTitle from ad where groupId=10113000).collect Then, I checked the amount of input data on the WEB UI. But the amount of input data is ALWAYS 80.2M regardless whether I turn the spark.sql.parquet.filterPushdown flag on or off. I'm not sure, if there is anything that I must do when generating the parquet file in order to make the predicate pushdown available. (Like ORC file, when creating the ORC file, I need to explicitly sort the field that will be used for predicate pushdown) Anyone have any idea? And, anyone knows the internal mechanism for parquet predicate pushdown? Thanks
Re: How to merge a RDD of RDDs into one uber RDD
an RDD cannot contain elements of type RDD. (i.e. you can't nest RDDs within RDDs, in fact, I don't think it makes any sense) I suggest rather than having an RDD of file names, collect those file name strings back on to the driver as a Scala array of file names, and then from there, make an array of RDDs from which you can fold over them and merge them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21007.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TF-IDF from spark-1.1.0 not working on cluster mode
Could you attach the executor log? That may help identify the root cause. -Xiangrui On Mon, Jan 5, 2015 at 11:12 PM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, Word2Vec and TF-IDF algorithms in spark mllib-1.1.0 are working only in local mode and not on distributed mode. Null pointer exception has been thrown. Is this a bug in spark-1.1.0 ? Following is the code: def main(args:Array[String]) { val conf=new SparkConf val sc=new SparkContext(conf) val documents=sc.textFile(hdfs://IMPETUS-DSRV02:9000/nlp/sampletext).map(_.split( ).toSeq) val hashingTF = new HashingTF() val tf= hashingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) val rdd=tfidf.map { vec = println(vector is+vec) (10) } rdd.saveAsTextFile(/home/padma/usecase) } Exception thrown: 15/01/06 12:36:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/06 12:36:10 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@impetus-dsrv05.impetus.co.in:33898/user/Executor#-1525890167] with ID 0 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in, NODE_LOCAL, 1408 bytes) 15/01/06 12:36:10 INFO storage.BlockManagerMasterActor: Registering block manager IMPETUS-DSRV05.impetus.co.in:35130 with 2.1 GB RAM 15/01/06 12:36:12 INFO network.ConnectionManager: Accepted connection from [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:46888] 15/01/06 12:36:12 INFO network.SendingConnection: Initiating connection to [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130] 15/01/06 12:36:12 INFO network.SendingConnection: Connected to [IMPETUS-DSRV05.impetus.co.in/192.168.145.195:35130], 1 messages pending 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 2.1 KB, free: 2.1 GB) 15/01/06 12:36:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 10.1 KB, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_1 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 280.0 B, free: 2.1 GB) 15/01/06 12:36:13 INFO storage.BlockManagerInfo: Added rdd_3_0 in memory on IMPETUS-DSRV05.impetus.co.in:35130 (size: 416.0 B, free: 2.1 GB) 15/01/06 12:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, IMPETUS-DSRV05.impetus.co.in): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Thanks, Padma Ch - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet predicate pushdown
Thanks for that. Strangely enough I was actually using 1.1.1 where it did seem to be enabled by default. Since upgrading to 1.2.0 and setting that flag, I do get the expected result! Looks good! On Tue, Jan 6, 2015 at 12:17 PM, Michael Armbrust mich...@databricks.com wrote: Predicate push down into the input format is turned off by default because there is a bug in the current parquet library that null pointers when there are full row groups that are null. https://issues.apache.org/jira/browse/SPARK-4258 You can turn it on if you want: http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration On Mon, Jan 5, 2015 at 3:38 PM, Adam Gilmore dragoncu...@gmail.com wrote: Hi all, I have a question regarding predicate pushdown for Parquet. My understanding was this would use the metadata in Parquet's blocks/pages to skip entire chunks that won't match without needing to decode the values and filter on every value in the table. I was testing a scenario where I had 100M rows in a Parquet file. Summing over a column took about 2-3 seconds. I also have a column (e.g. customer ID) with approximately 100 unique values. My assumption, though not exactly linear, would be that filtering on this would reduce the query time significantly due to it skipping entire segments based on the metadata. In fact, it took much longer - somewhere in the vicinity of 4-5 seconds, which suggested to me it's reading all the values for the key column (100M values), then filtering, then reading all the relevant segments/values for the measure column, hence the increase in time. In the logs, I could see it was successfully pushing down a Parquet predicate, so I'm not sure I'm understanding why this is taking longer. Could anyone shed some light on this or point out where I'm going wrong? Thanks!
Cannot see RDDs in Spark UI
Hi, I create a bunch of RDDs, including schema RDDs. When I run the program and go to UI on xxx:4040, the storage tab does not shows any RDDs. Spark version is 1.1.1 (Hadoop 2.3) Any thoughts? Thanks,
RE: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
Thanks Ted. You are right, hbase-site.xml is in the classpath. But previously I have it in the classpath too and the app works fine. I believe I found the problem. I built Spark 1.2.0 myself and forgot to change the dependency hbase version to 0.98.8-hadoop2, which is the version I use. When I use spark-examples-1.1.1-hadoop2.5.2.jar from Spark 1.1.1 build (build with hbase 0.98.8-hadoop2 ), the problem went away. I’ll try to run the app again after rebuild Spark 1.2.0 with 0.98.8-hadoop2. From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, January 06, 2015 11:56 AM To: Max Xu Cc: user@spark.apache.org Subject: Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 I assume hbase-site.xml is in the classpath. Can you try the code snippet in standalone program to see if the problem persists ? Cheers On Tue, Jan 6, 2015 at 6:42 AM, Max Xu max...@twosigma.commailto:max...@twosigma.com wrote: Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) org.apache.hadoop.hbase.client.HTable.put(HTable.java:915) com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) KafkaMessageHandler thread is in WAITING state Thread 70:
RE: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
Awesome. Thanks again Ted. I remember there is a block in the pom.xml under the example folder that default hbase version to hadoop1. I figured out this last time when I built Spark 1.1.1 but forgot this time. profile idhbase-hadoop1/id activation property name!hbase.profile/name /property /activation properties hbase.version0.98.7-hadoop1/hbase.version /properties /profile From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, January 06, 2015 12:39 PM To: Max Xu Cc: user@spark.apache.org Subject: Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 Default profile is hbase-hadoop1 so you need to specify -Dhbase.profile=hadoop2 See SPARK-1297 Cheers On Tue, Jan 6, 2015 at 9:11 AM, Max Xu max...@twosigma.commailto:max...@twosigma.com wrote: Thanks Ted. You are right, hbase-site.xml is in the classpath. But previously I have it in the classpath too and the app works fine. I believe I found the problem. I built Spark 1.2.0 myself and forgot to change the dependency hbase version to 0.98.8-hadoop2, which is the version I use. When I use spark-examples-1.1.1-hadoop2.5.2.jar from Spark 1.1.1 build (build with hbase 0.98.8-hadoop2 ), the problem went away. I’ll try to run the app again after rebuild Spark 1.2.0 with 0.98.8-hadoop2. From: Ted Yu [mailto:yuzhih...@gmail.commailto:yuzhih...@gmail.com] Sent: Tuesday, January 06, 2015 11:56 AM To: Max Xu Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 I assume hbase-site.xml is in the classpath. Can you try the code snippet in standalone program to see if the problem persists ? Cheers On Tue, Jan 6, 2015 at 6:42 AM, Max Xu max...@twosigma.commailto:max...@twosigma.com wrote: Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) org.apache.hadoop.hbase.client.HTable.put(HTable.java:915) com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35)
RDD Moving Average
Is there an easy way to do a moving average across a single RDD (in a non-streaming app). Here is the use case. I have an RDD made up of stock prices. I want to calculate a moving average using a window size of N. Thanks. Asim
RE: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
Issue resolved after updating the Hbase version to 0.98.8-hadoop2. Thanks Ted for all the help! For future reference: This problem has nothing to do with Spark 1.2.0 but simply because I built Spark 1.2.0 with the wrong Hbase version. From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, January 06, 2015 1:12 PM To: Max Xu Cc: user@spark.apache.org Subject: Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 I doubt anyone would deploy hbase 0.98.x on hadoop-1 Looks like hadoop2 profile should be made the default. Cheers On Tue, Jan 6, 2015 at 9:49 AM, Max Xu max...@twosigma.commailto:max...@twosigma.com wrote: Awesome. Thanks again Ted. I remember there is a block in the pom.xml under the example folder that default hbase version to hadoop1. I figured out this last time when I built Spark 1.1.1 but forgot this time. profile idhbase-hadoop1/id activation property name!hbase.profile/name /property /activation properties hbase.version0.98.7-hadoop1/hbase.version /properties /profile From: Ted Yu [mailto:yuzhih...@gmail.commailto:yuzhih...@gmail.com] Sent: Tuesday, January 06, 2015 12:39 PM To: Max Xu Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 Default profile is hbase-hadoop1 so you need to specify -Dhbase.profile=hadoop2 See SPARK-1297 Cheers On Tue, Jan 6, 2015 at 9:11 AM, Max Xu max...@twosigma.commailto:max...@twosigma.com wrote: Thanks Ted. You are right, hbase-site.xml is in the classpath. But previously I have it in the classpath too and the app works fine. I believe I found the problem. I built Spark 1.2.0 myself and forgot to change the dependency hbase version to 0.98.8-hadoop2, which is the version I use. When I use spark-examples-1.1.1-hadoop2.5.2.jar from Spark 1.1.1 build (build with hbase 0.98.8-hadoop2 ), the problem went away. I’ll try to run the app again after rebuild Spark 1.2.0 with 0.98.8-hadoop2. From: Ted Yu [mailto:yuzhih...@gmail.commailto:yuzhih...@gmail.com] Sent: Tuesday, January 06, 2015 11:56 AM To: Max Xu Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 I assume hbase-site.xml is in the classpath. Can you try the code snippet in standalone program to see if the problem persists ? Cheers On Tue, Jan 6, 2015 at 6:42 AM, Max Xu max...@twosigma.commailto:max...@twosigma.com wrote: Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090)
Re: Data Locality
No, not all rdds have location information, and in any case tasks may be scheduled on non-local nodes if there is idle capacity. see spark.locality.wait http://spark.apache.org/docs/latest/configuration.html On Tue, Jan 6, 2015 at 10:17 AM, gtinside gtins...@gmail.com wrote: Does spark guarantee to push the processing to the data ? Before creating tasks does spark always check for data location ? So for example if I have 3 spark nodes (Node1, Node2, Node3) and data is local to just 2 nodes (Node1 and Node2) , will spark always schedule tasks on the node for which the data is local ie Node1 and Node 2(assuming Node1 and Node2 have enough resources to execute the tasks)? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-Locality-tp21000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
I assume hbase-site.xml is in the classpath. Can you try the code snippet in standalone program to see if the problem persists ? Cheers On Tue, Jan 6, 2015 at 6:42 AM, Max Xu max...@twosigma.com wrote: Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) org.apache.hadoop.hbase.client.HTable.put(HTable.java:915) com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) KafkaMessageHandler thread is in WAITING state Thread 70: KafkaMessageHandler-0 (WAITING) sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) kafka.consumer.ConsumerIterator.makeNext(Unknown Source) kafka.consumer.ConsumerIterator.makeNext(Unknown Source) kafka.utils.IteratorTemplate.maybeComputeNext(Unknown Source) kafka.utils.IteratorTemplate.hasNext(Unknown Source)
1.2.0 - java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator
I am running into the same problem describe here https://www.mail-archive.com/user%40spark.apache.org/msg17788.html which for some reasons does not appear in the archives. I am having a standalone scala application, build (using sbt) with spark jars from maven: org.apache.spark %% spark-core % 1.2.0, org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-hive % 1.2.0, The application acts as a driver and connects to a cluster in standalone mode. I use the ec2 scripts to create the cluster in AWS. I see the scripts downloading and installing the following packages among other: http://s3.amazonaws.com/spark-related-packages/scala-2.10.3.tgz http://s3.amazonaws.com/spark-related-packages/spark-1.2.0-bin-hadoop1.tgz http://s3.amazonaws.com/spark-related-packages/hadoop-1.0.4.tar.gz http://s3.amazonaws.com/spark-related-packages/hadoop-1.0.4.tar.gz which looks ok, since it's the same spark version (1.2.0) and I believe the jars in maven central are built used hadoop1. But quickly after loading my data, I try to do: sc.saveAsObjectFile(s3m://path/to/my/s3/bucket/) and I get the following exception: java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator and I really don't know how to get rid of it. I have read this https://issues.apache.org/jira/browse/SPARK-2075 with interest but I still don't understand what I might be doing wrong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-2-0-java-lang-ClassCastException-scala-Tuple2-cannot-be-cast-to-scala-collection-Iterator-tp21001.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
Default profile is hbase-hadoop1 so you need to specify -Dhbase.profile=hadoop2 See SPARK-1297 Cheers On Tue, Jan 6, 2015 at 9:11 AM, Max Xu max...@twosigma.com wrote: Thanks Ted. You are right, hbase-site.xml is in the classpath. But previously I have it in the classpath too and the app works fine. I believe I found the problem. I built Spark 1.2.0 myself and forgot to change the dependency hbase version to 0.98.8-hadoop2, which is the version I use. When I use spark-examples-1.1.1-hadoop2.5.2.jar from Spark 1.1.1 build (build with hbase 0.98.8-hadoop2 ), the problem went away. I’ll try to run the app again after rebuild Spark 1.2.0 with 0.98.8-hadoop2. *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Tuesday, January 06, 2015 11:56 AM *To:* Max Xu *Cc:* user@spark.apache.org *Subject:* Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 I assume hbase-site.xml is in the classpath. Can you try the code snippet in standalone program to see if the problem persists ? Cheers On Tue, Jan 6, 2015 at 6:42 AM, Max Xu max...@twosigma.com wrote: Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) org.apache.hadoop.hbase.client.HTable.put(HTable.java:915) com.xxx.spark.streaming.JavaKafkaSparkHbase$WriteFunction.persist(JavaKafkaSparkHbase.java:125) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:42) com.xxx.spark.streaming.PersistFunction$1.call(PersistFunction.java:35) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:195) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:773) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:56)
Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0
I doubt anyone would deploy hbase 0.98.x on hadoop-1 Looks like hadoop2 profile should be made the default. Cheers On Tue, Jan 6, 2015 at 9:49 AM, Max Xu max...@twosigma.com wrote: Awesome. Thanks again Ted. I remember there is a block in the pom.xml under the example folder that default hbase version to hadoop1. I figured out this last time when I built Spark 1.1.1 but forgot this time. profile idhbase-hadoop1/id activation property name!hbase.profile/name /property /activation properties hbase.version0.98.7-hadoop1/hbase.version /properties /profile *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Tuesday, January 06, 2015 12:39 PM *To:* Max Xu *Cc:* user@spark.apache.org *Subject:* Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 Default profile is hbase-hadoop1 so you need to specify -Dhbase.profile=hadoop2 See SPARK-1297 Cheers On Tue, Jan 6, 2015 at 9:11 AM, Max Xu max...@twosigma.com wrote: Thanks Ted. You are right, hbase-site.xml is in the classpath. But previously I have it in the classpath too and the app works fine. I believe I found the problem. I built Spark 1.2.0 myself and forgot to change the dependency hbase version to 0.98.8-hadoop2, which is the version I use. When I use spark-examples-1.1.1-hadoop2.5.2.jar from Spark 1.1.1 build (build with hbase 0.98.8-hadoop2 ), the problem went away. I’ll try to run the app again after rebuild Spark 1.2.0 with 0.98.8-hadoop2. *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Tuesday, January 06, 2015 11:56 AM *To:* Max Xu *Cc:* user@spark.apache.org *Subject:* Re: Saving data to Hbase hung in Spark streaming application with Spark 1.2.0 I assume hbase-site.xml is in the classpath. Can you try the code snippet in standalone program to see if the problem persists ? Cheers On Tue, Jan 6, 2015 at 6:42 AM, Max Xu max...@twosigma.com wrote: Hi all, I have a Spark streaming application that ingests data from a Kafka topic and persists received data to Hbase. It works fine with Spark 1.1.1 in YARN cluster mode. Basically, I use the following code to persist each partition of each RDD to Hbase: @Override void call(IteratorMetric it) throws Exception { HConnection hConnection = null; HTableInterface htable = null; try { hConnection = HConnectionManager.createConnection(_conf.value()); htable = hConnection.getTable(_tablePrefix + _ + new SimpleDateFormat(_MM_dd).format(new Date())); htable.setAutoFlush(false, true); while (it.hasNext()) { Metric metric = it.next(); htable.put(_put.call(metric)); } htable.flushCommits(); }finally{ try { if (htable != null) { htable.close(); } } catch (Exception e) { System.err.println(error closing htable); System.err.println(e.toString()); } try { if (hConnection != null) { hConnection.close(); } } catch (Exception e) { System.err.println(error closing hConnection); System.err.println(e.toString()); } } } I use Kafka receiver to create input stream. KafkaUtils.createStream(jssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER()); With 1.2.0, receiving from Kafka still works normally. I tried both KafkaReceiver and ReliableKafkaReceiver, both can get data from Kafka without a problem. However, the application just didn’t save data to Hbase. The streaming page of Spark API showed it stuck at processing the first batch. The Executor threads stayed in TIMED_WAITING state: Thread 54: Executor task launch worker-0 (TIMED_WAITING) java.lang.Thread.sleep(Native Method) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1296) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1090) org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1047) org.apache.hadoop.hbase.client.AsyncProcess.findDestLocation(AsyncProcess.java:365) org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:310) org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:971) org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:954) org.apache.hadoop.hbase.client.HTable.put(HTable.java:915)
Data Locality
Does spark guarantee to push the processing to the data ? Before creating tasks does spark always check for data location ? So for example if I have 3 spark nodes (Node1, Node2, Node3) and data is local to just 2 nodes (Node1 and Node2) , will spark always schedule tasks on the node for which the data is local ie Node1 and Node 2(assuming Node1 and Node2 have enough resources to execute the tasks)? Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-Locality-tp21000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle Problems in 1.2.0
The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com wrote: I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s Traceback (most recent call last): File /home/hadoop/test_scripts/test.py, line 63, in module result = j.collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 676, in collect bytesInJava = self._jrdd.collect().iterator() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File
Re: RDD Moving Average
First you'd need to sort the RDD to give it a meaningful order, but I assume you have some kind of timestamp in your data you can sort on. I think you might be after the sliding() function, a developer API in MLlib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43 On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis asimja...@gmail.com wrote: Is there an easy way to do a moving average across a single RDD (in a non-streaming app). Here is the use case. I have an RDD made up of stock prices. I want to calculate a moving average using a window size of N. Thanks. Asim
Re: Using graphx to calculate average distance of a big graph
[-dev] What size of graph are you hoping to run this on? For small graphs where materializing the all-pairs shortest path is an option, you could simply find the APSP using https://github.com/apache/spark/pull/3619 and then take the average distance (apsp.map(_._2.toDouble).mean). Ankur http://www.ankurdave.com/ On Sun, Jan 4, 2015 at 6:28 PM, James alcaid1...@gmail.com wrote: Recently we want to use spark to calculate the average shortest path distance between each reachable pair of nodes in a very big graph. Is there any one ever try this? We hope to discuss about the problem.