Re: OOM for HiveFromSpark example
Not sure, but you can create that path in all workers and put that file in it. Thanks Best Regards On Thu, Mar 26, 2015 at 1:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The Hive command LOAD DATA LOCAL INPATH '/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt' INTO TABLE src_spark 1. LOCAL INPATH. if i push to HDFS then how will it work ? 2. I cant use sc.addFile, cause i want to run Hive (Spark SQL) queries. On Thu, Mar 26, 2015 at 1:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Now its clear that the workers are not having the file kv1.txt in their local filesystem. You can try putting that in hdfs and use the URI to that file or try adding the file with sc.addFile Thanks Best Regards On Thu, Mar 26, 2015 at 1:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Does not work 15/03/26 01:07:05 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark 15/03/26 01:07:06 ERROR ql.Driver: FAILED: SemanticException Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.applyConstraints(LoadSemanticAnalyzer.java:142) at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:233) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422) Does the input file needs to be passed to executor via -- jars ? On Thu, Mar 26, 2015 at 12:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to give the complete path to the file kv1.txt. On 26 Mar 2015 11:48, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am now seeing this error. 15/03/25 19:44:03 ERROR yarn.ApplicationMaster: User class threw exception: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:312) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280) -sh-4.1$ pwd /home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 -sh-4.1$ ls examples/src/main/resources/kv1.txt examples/src/main/resources/kv1.txt -sh-4.1$ On Thu, Mar 26, 2015 at 8:08 AM, Zhan Zhang zzh...@hortonworks.com wrote: You can do it in $SPARK_HOME/conf/spark-defaults.con spark.driver.extraJavaOptions -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Where and how do i pass this or other JVM argument ? -XX:MaxPermSize=512m On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang zzh...@hortonworks.com wrote: I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157 with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed 15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called 15/01/13
Column not found in schema when querying partitioned table
Spark 1.3.0, Parquet I'm having trouble referencing partition columns in my queries. In the following example, 'probeTypeId' is a partition column. For example, the directory structure looks like this: /mydata /probeTypeId=1 ...files... /probeTypeId=2 ...files... I see the column when I reference load a DF using the /mydata directory and call df.printSchema(): ... |-- probeTypeId: integer (nullable = true) ... Parquet is also aware of the column: optional int32 probeTypeId; And this works fine: sqlContext.sql(select probeTypeId from df limit 1); ...as does df.show() - it shows the correct values for the partition column. However, when I try to use a partition column in a where clause, I get an exception stating that the column was not found in the schema: sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1); ... ... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column [probeTypeId] was not found in schema! at parquet.Preconditions.checkArgument(Preconditions.java:47) at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41) at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162) at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22) at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108) at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158) ... ... What am I doing wrong? Here's the full stack trace: using local[*] for master 06:05:55,675 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set 06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] 06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT] 06:05:55,721 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property 06:05:55,768 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO 06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT] 06:05:55,769 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration. 06:05:55,770 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering current configuration as safe fallback point INFO org.apache.spark.SparkContext Running Spark version 1.3.0 WARN o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop library for your platform... using builtin-java classes where applicable INFO org.apache.spark.SecurityManager Changing view acls to: jon INFO org.apache.spark.SecurityManager Changing modify acls to: jon INFO org.apache.spark.SecurityManager SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jon); users with modify permissions: Set(jon) INFO akka.event.slf4j.Slf4jLogger Slf4jLogger started INFO Remoting Starting remoting INFO Remoting Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.1.134:62493] INFO org.apache.spark.util.Utils Successfully started service 'sparkDriver' on port 62493. INFO org.apache.spark.SparkEnv Registering MapOutputTracker INFO org.apache.spark.SparkEnv Registering BlockManagerMaster INFO o.a.spark.storage.DiskBlockManager Created local directory at /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-150e23b2-ff19-4a51-8cfc-25fb8e1b3f2b/blockmgr-6eea286c-7473-4bda-8886-7250156b68f4 INFO org.apache.spark.storage.MemoryStore MemoryStore started with capacity 1966.1 MB INFO org.apache.spark.HttpFileServer HTTP File server directory is /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-cf4687bd-1563-4ddf-b697-21c96fd95561/httpd-6343b9c9-bb66-43ac-ac43-6da80c7a1f95 INFO org.apache.spark.HttpServer Starting HTTP Server INFO o.spark-project.jetty.server.Server
Re: OOM for HiveFromSpark example
Could you try putting that file in hdfs and try like: LOAD DATA INPATH 'hdfs://sigmoid/test/kv1.txt' INTO TABLE src_spark Thanks Best Regards On Thu, Mar 26, 2015 at 2:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: When you run it in local mode ^^ Thanks Best Regards On Thu, Mar 26, 2015 at 2:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I don;t think thats correct. load data local should pick input from local directory. On Thu, Mar 26, 2015 at 1:59 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Not sure, but you can create that path in all workers and put that file in it. Thanks Best Regards On Thu, Mar 26, 2015 at 1:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The Hive command LOAD DATA LOCAL INPATH '/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt' INTO TABLE src_spark 1. LOCAL INPATH. if i push to HDFS then how will it work ? 2. I cant use sc.addFile, cause i want to run Hive (Spark SQL) queries. On Thu, Mar 26, 2015 at 1:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Now its clear that the workers are not having the file kv1.txt in their local filesystem. You can try putting that in hdfs and use the URI to that file or try adding the file with sc.addFile Thanks Best Regards On Thu, Mar 26, 2015 at 1:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Does not work 15/03/26 01:07:05 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark 15/03/26 01:07:06 ERROR ql.Driver: FAILED: SemanticException Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.applyConstraints(LoadSemanticAnalyzer.java:142) at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:233) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422) Does the input file needs to be passed to executor via -- jars ? On Thu, Mar 26, 2015 at 12:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to give the complete path to the file kv1.txt. On 26 Mar 2015 11:48, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am now seeing this error. 15/03/25 19:44:03 ERROR yarn.ApplicationMaster: User class threw exception: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:312) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280) -sh-4.1$ pwd /home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 -sh-4.1$ ls examples/src/main/resources/kv1.txt examples/src/main/resources/kv1.txt -sh-4.1$ On Thu, Mar 26, 2015 at 8:08 AM, Zhan Zhang zzh...@hortonworks.com wrote: You can do it in $SPARK_HOME/conf/spark-defaults.con spark.driver.extraJavaOptions -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Where and how do i pass this or other JVM argument ? -XX:MaxPermSize=512m On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang zzh...@hortonworks.com wrote: I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering
Missing an output location for shuffle. : (
Again,when I do larger file Spark-sql query, error occured.Anyone have got fix it .Please help me. Here is the track. org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) )
Handling Big data for interactive BI tools
Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Re: OOM for HiveFromSpark example
I don;t think thats correct. load data local should pick input from local directory. On Thu, Mar 26, 2015 at 1:59 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Not sure, but you can create that path in all workers and put that file in it. Thanks Best Regards On Thu, Mar 26, 2015 at 1:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The Hive command LOAD DATA LOCAL INPATH '/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt' INTO TABLE src_spark 1. LOCAL INPATH. if i push to HDFS then how will it work ? 2. I cant use sc.addFile, cause i want to run Hive (Spark SQL) queries. On Thu, Mar 26, 2015 at 1:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Now its clear that the workers are not having the file kv1.txt in their local filesystem. You can try putting that in hdfs and use the URI to that file or try adding the file with sc.addFile Thanks Best Regards On Thu, Mar 26, 2015 at 1:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Does not work 15/03/26 01:07:05 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark 15/03/26 01:07:06 ERROR ql.Driver: FAILED: SemanticException Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.applyConstraints(LoadSemanticAnalyzer.java:142) at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:233) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422) Does the input file needs to be passed to executor via -- jars ? On Thu, Mar 26, 2015 at 12:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to give the complete path to the file kv1.txt. On 26 Mar 2015 11:48, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am now seeing this error. 15/03/25 19:44:03 ERROR yarn.ApplicationMaster: User class threw exception: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:312) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280) -sh-4.1$ pwd /home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 -sh-4.1$ ls examples/src/main/resources/kv1.txt examples/src/main/resources/kv1.txt -sh-4.1$ On Thu, Mar 26, 2015 at 8:08 AM, Zhan Zhang zzh...@hortonworks.com wrote: You can do it in $SPARK_HOME/conf/spark-defaults.con spark.driver.extraJavaOptions -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Where and how do i pass this or other JVM argument ? -XX:MaxPermSize=512m On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang zzh...@hortonworks.com wrote: I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157 with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed 15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
On 25 Mar 2015, at 21:54, roni roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote: Is there any way that I can install the new one and remove previous version. I installed spark 1.3 on my EC2 master and set teh spark home to the new one. But when I start teh spark-shell I get - java.lang.UnsatisfiedLinkError: org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V at org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native Method) Is There no way to upgrade without creating new cluster? Thanks Roni This isn't a spark version problem itself, more one of Hadoop versions. If you see this it means that the Hadoop JARs shipping with Spark 1.3 are trying to bind to a native method implemented in hadoop.so —but that method isn't there. Possible fixes 1. Simplest: find out which version of Hadoop is running in the EC2 cluster, and get a version of Spark 1.3 built against that version. If you can't find one, it's easy enough to just check out the 1.3.0 release off github/ASF git and build it yourself. 2. Upgrade the underlying Hadoop Cluster 3. find the location of hadoop.so in your VMs, and overwrite it with a the version of Hadoop.so from the version of Hadoop used in the build of Spark 1.3, and rely on the intent of the Hadoop team to make updated native binaries backwards compatible across branch-2 releases (i.e. they only add functions, not remove or rename them). #3 is an ugly hack which may work immediately but once you get in the game of mixing artifacts from different Hadoop releases, is a slippery slope towards an unmaintanable Hadoop cluster. I'd go for tactic #1 first
Write Parquet File with spark-streaming with Spark 1.3
Hi I've succeed to write kafka stream to parquet file in Spark 1.2 but I can't make it with spark 1.3 As in streaming I can't use saveAsParquetFile() because I can't add data to an existing parquet File I know that it's possible to stream data directly into parquet could you help me by providing a little sample what API I need to use ?? Thanks
Re: OOM for HiveFromSpark example
The Hive command LOAD DATA LOCAL INPATH '/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt' INTO TABLE src_spark 1. LOCAL INPATH. if i push to HDFS then how will it work ? 2. I cant use sc.addFile, cause i want to run Hive (Spark SQL) queries. On Thu, Mar 26, 2015 at 1:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Now its clear that the workers are not having the file kv1.txt in their local filesystem. You can try putting that in hdfs and use the URI to that file or try adding the file with sc.addFile Thanks Best Regards On Thu, Mar 26, 2015 at 1:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Does not work 15/03/26 01:07:05 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark 15/03/26 01:07:06 ERROR ql.Driver: FAILED: SemanticException Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.applyConstraints(LoadSemanticAnalyzer.java:142) at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:233) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422) Does the input file needs to be passed to executor via -- jars ? On Thu, Mar 26, 2015 at 12:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to give the complete path to the file kv1.txt. On 26 Mar 2015 11:48, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am now seeing this error. 15/03/25 19:44:03 ERROR yarn.ApplicationMaster: User class threw exception: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:312) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280) -sh-4.1$ pwd /home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 -sh-4.1$ ls examples/src/main/resources/kv1.txt examples/src/main/resources/kv1.txt -sh-4.1$ On Thu, Mar 26, 2015 at 8:08 AM, Zhan Zhang zzh...@hortonworks.com wrote: You can do it in $SPARK_HOME/conf/spark-defaults.con spark.driver.extraJavaOptions -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Where and how do i pass this or other JVM argument ? -XX:MaxPermSize=512m On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang zzh...@hortonworks.com wrote: I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157 with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed 15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/01/13 17:59:44 INFO DataNucleus.Persistence: Property
Re: OOM for HiveFromSpark example
When you run it in local mode ^^ Thanks Best Regards On Thu, Mar 26, 2015 at 2:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I don;t think thats correct. load data local should pick input from local directory. On Thu, Mar 26, 2015 at 1:59 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Not sure, but you can create that path in all workers and put that file in it. Thanks Best Regards On Thu, Mar 26, 2015 at 1:56 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The Hive command LOAD DATA LOCAL INPATH '/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt' INTO TABLE src_spark 1. LOCAL INPATH. if i push to HDFS then how will it work ? 2. I cant use sc.addFile, cause i want to run Hive (Spark SQL) queries. On Thu, Mar 26, 2015 at 1:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Now its clear that the workers are not having the file kv1.txt in their local filesystem. You can try putting that in hdfs and use the URI to that file or try adding the file with sc.addFile Thanks Best Regards On Thu, Mar 26, 2015 at 1:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Does not work 15/03/26 01:07:05 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=src_spark 15/03/26 01:07:06 ERROR ql.Driver: FAILED: SemanticException Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:23 Invalid path ''/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt'': No files matching path file:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/examples/src/main/resources/kv1.txt at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.applyConstraints(LoadSemanticAnalyzer.java:142) at org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:233) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422) Does the input file needs to be passed to executor via -- jars ? On Thu, Mar 26, 2015 at 12:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to give the complete path to the file kv1.txt. On 26 Mar 2015 11:48, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am now seeing this error. 15/03/25 19:44:03 ERROR yarn.ApplicationMaster: User class threw exception: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException Line 1:23 Invalid path ''examples/src/main/resources/kv1.txt'': No files matching path file:/hadoop/10/scratch/local/usercache/dvasthimal/appcache/application_1426715280024_89893/container_1426715280024_89893_01_02/examples/src/main/resources/kv1.txt at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:312) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280) -sh-4.1$ pwd /home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 -sh-4.1$ ls examples/src/main/resources/kv1.txt examples/src/main/resources/kv1.txt -sh-4.1$ On Thu, Mar 26, 2015 at 8:08 AM, Zhan Zhang zzh...@hortonworks.com wrote: You can do it in $SPARK_HOME/conf/spark-defaults.con spark.driver.extraJavaOptions -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Where and how do i pass this or other JVM argument ? -XX:MaxPermSize=512m On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang zzh...@hortonworks.com wrote: I solve this by increase the PermGen memory size in driver. -XX:MaxPermSize=512m Thanks. Zhan Zhang On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I am facing same issue, posted a new thread. Please respond. On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, I am trying to run hive context in yarn-cluster mode, but met some error. Does anybody know what cause the issue. I use following cmd to build the distribution: ./make-distribution.sh -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block manager cn122-10.l42scl.hortonworks.com:56157 with 1589.8 MB RAM, BlockManagerId(2, cn122-10.l42scl.hortonworks.com, 56157) 15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT,
Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how?
Hello Shahab, Are you able to read tables created in Hive from Spark SQL ? If yes, how are you referring them ? On Thu, Mar 26, 2015 at 1:11 PM, Takeshi Yamamuro linguin@gmail.com wrote: I think it is not `sqlContext` but hiveContext because `create temporary function` is not supported in SQLContext. On Wed, Mar 25, 2015 at 5:58 AM, Jon Chase jon.ch...@gmail.com wrote: Shahab - This should do the trick until Hao's changes are out: sqlContext.sql(create temporary function foobar as 'com.myco.FoobarUDAF'); sqlContext.sql(select foobar(some_column) from some_table); This works without requiring to 'deploy' a JAR with the UDAF in it - just make sure the UDAF is in your project's classpath. On Tue, Mar 10, 2015 at 8:21 PM, Cheng, Hao hao.ch...@intel.com wrote: Oh, sorry, my bad, currently Spark SQL doesn’t provide the user interface for UDAF, but it can work seamlessly with Hive UDAF (via HiveContext). I am also working on the UDAF interface refactoring, after that we can provide the custom interface for extension. https://github.com/apache/spark/pull/3247 *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Wednesday, March 11, 2015 1:44 AM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: Registering custom UDAFs with HiveConetxt in SparkSQL, how? Thanks Hao, But my question concerns UDAF (user defined aggregation function ) not UDTF( user defined type function ). I appreciate if you could point me to some starting point on UDAF development in Spark. Thanks Shahab On Tuesday, March 10, 2015, Cheng, Hao hao.ch...@intel.com wrote: Currently, Spark SQL doesn’t provide interface for developing the custom UDTF, but it can work seamless with Hive UDTF. I am working on the UDTF refactoring for Spark SQL, hopefully will provide an Hive independent UDTF soon after that. *From:* shahab [mailto:shahab.mok...@gmail.com] *Sent:* Tuesday, March 10, 2015 5:44 PM *To:* user@spark.apache.org *Subject:* Registering custom UDAFs with HiveConetxt in SparkSQL, how? Hi, I need o develop couple of UDAFs and use them in the SparkSQL. While UDFs can be registered as a function in HiveContext, I could not find any documentation of how UDAFs can be registered in the HiveContext?? so far what I have found is to make a JAR file, out of developed UDAF class, and then deploy the JAR file to SparkSQL . But is there any way to avoid deploying the jar file and register it programmatically? best, /Shahab -- --- Takeshi Yamamuro -- Deepak
Re: Column not found in schema when querying partitioned table
I've filed this as https://issues.apache.org/jira/browse/SPARK-6554 On Thu, Mar 26, 2015 at 6:29 AM, Jon Chase jon.ch...@gmail.com wrote: Spark 1.3.0, Parquet I'm having trouble referencing partition columns in my queries. In the following example, 'probeTypeId' is a partition column. For example, the directory structure looks like this: /mydata /probeTypeId=1 ...files... /probeTypeId=2 ...files... I see the column when I reference load a DF using the /mydata directory and call df.printSchema(): ... |-- probeTypeId: integer (nullable = true) ... Parquet is also aware of the column: optional int32 probeTypeId; And this works fine: sqlContext.sql(select probeTypeId from df limit 1); ...as does df.show() - it shows the correct values for the partition column. However, when I try to use a partition column in a where clause, I get an exception stating that the column was not found in the schema: sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1); ... ... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column [probeTypeId] was not found in schema! at parquet.Preconditions.checkArgument(Preconditions.java:47) at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41) at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162) at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22) at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108) at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158) ... ... What am I doing wrong? Here's the full stack trace: using local[*] for master 06:05:55,675 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set 06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] 06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT] 06:05:55,721 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property 06:05:55,768 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO 06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT] 06:05:55,769 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration. 06:05:55,770 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering current configuration as safe fallback point INFO org.apache.spark.SparkContext Running Spark version 1.3.0 WARN o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop library for your platform... using builtin-java classes where applicable INFO org.apache.spark.SecurityManager Changing view acls to: jon INFO org.apache.spark.SecurityManager Changing modify acls to: jon INFO org.apache.spark.SecurityManager SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jon); users with modify permissions: Set(jon) INFO akka.event.slf4j.Slf4jLogger Slf4jLogger started INFO Remoting Starting remoting INFO Remoting Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.1.134:62493] INFO org.apache.spark.util.Utils Successfully started service 'sparkDriver' on port 62493. INFO org.apache.spark.SparkEnv Registering MapOutputTracker INFO org.apache.spark.SparkEnv Registering BlockManagerMaster INFO o.a.spark.storage.DiskBlockManager Created local directory at /var/folders/x7/9hdp8kw9569864088tsl4jmmgn/T/spark-150e23b2-ff19-4a51-8cfc-25fb8e1b3f2b/blockmgr-6eea286c-7473-4bda-8886-7250156b68f4 INFO org.apache.spark.storage.MemoryStore MemoryStore started with capacity 1966.1 MB INFO org.apache.spark.HttpFileServer HTTP File server directory is
Re: Hive Table not from from Spark SQL
I have tables dw_bid that is created in Hive and has nothing to do with Spark. I have data in avro that i want to join with dw_bid table, this join needs to be done using Spark SQL. However for some reason Spark says dw_bid table does not exist. How do i say spark that dw_bid is a table created in Hive and read it. Query that is run from Spark SQL == insert overwrite table sojsuccessevents2_spark select guid,sessionKey,sessionStartDate,sojDataDate,seqNum,eventTimestamp,siteId,successEventType,sourceType,itemId, shopCartId,b.transaction_Id as transactionId,offerId,b.bdr_id as userId,priorPage1SeqNum,priorPage1PageId,exclWMSearchAttemptSeqNum,exclPriorSearchPageId, exclPriorSearchSeqNum,exclPriorSearchCategory,exclPriorSearchL1,exclPriorSearchL2,currentImpressionId,sourceImpressionId,exclPriorSearchSqr,exclPriorSearchSort, isDuplicate,b.bid_date as transactionDate,auctionTypeCode,isBin,leafCategoryId,itemSiteId,b.qty_bid as bidQuantity, b.bid_amt_unit_lstg_curncy * b.bid_exchng_rate as bidAmtUsd,offerQuantity,offerAmountUsd,offerCreateDate,buyerSegment,buyerCountryId,sellerId,sellerCountryId, sellerStdLevel,cssSellerLevel,a.experimentChannel from sojsuccessevents1 a join dw_bid b on a.itemId = b.item_id and a.transactionId = b.transaction_id where b.auct_end_dt = '2015-02-16' AND b.bid_dt = '2015-02-16' AND b.bid_type_code IN (1,9) AND b.bdr_id 0 AND ( b.bid_flags 32) = 0 and lower(a.successEventType) IN ('bid','bin') If i create sojsuccessevents2_spark from hive command line and run above command form Spark SQL program then i get error sojsuccessevents2_spark table not found. Hence i dropped the command from Hive and run create table sojsuccessevents2_spark from Spark SQL before running above command and it works until it hits next road block dw_bid table not found This makes me belive that Spark for some reason is not able to read/understand the tables created outside Spark. I did copy /apache/hive/conf/hive-site.xml into Spark conf directory. Please suggest. Logs ——— 15/03/26 03:50:40 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=dw_bid 15/03/26 03:50:40 ERROR metadata.Hive: NoSuchObjectException(message:default.dw_bid table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) 15/03/26 03:50:40 ERROR yarn.ApplicationMaster: User class threw exception: no such table List(dw_bid); line 1 pos 843 org.apache.spark.sql.AnalysisException: no such table List(dw_bid); line 1 pos 843 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:178) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:187) Regards, Deepak On Thu, Mar 26, 2015 at 4:27 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have this query insert overwrite table sojsuccessevents2_spark select guid,sessionKey,sessionStartDate,sojDataDate,seqNum,eventTimestamp,siteId,successEventType,sourceType,itemId, shopCartId,b.transaction_Id as transactionId,offerId,b.bdr_id as userId,priorPage1SeqNum,priorPage1PageId,exclWMSearchAttemptSeqNum,exclPriorSearchPageId, exclPriorSearchSeqNum,exclPriorSearchCategory,exclPriorSearchL1,exclPriorSearchL2,currentImpressionId,sourceImpressionId,exclPriorSearchSqr,exclPriorSearchSort, isDuplicate,b.bid_date as transactionDate,auctionTypeCode,isBin,leafCategoryId,itemSiteId,b.qty_bid as bidQuantity, b.bid_amt_unit_lstg_curncy * b.bid_exchng_rate as bidAmtUsd,offerQuantity,offerAmountUsd,offerCreateDate,buyerSegment,buyerCountryId,sellerId,sellerCountryId, sellerStdLevel,cssSellerLevel,a.experimentChannel from sojsuccessevents1 a *join dw_bid b* on a.itemId = b.item_id and a.transactionId = b.transaction_id where b.auct_end_dt = '2015-02-16' AND b.bid_dt = '2015-02-16' AND b.bid_type_code IN (1,9) AND b.bdr_id 0 AND ( b.bid_flags 32) = 0 and lower(a.successEventType) IN ('bid','bin') If i create sojsuccessevents2_spark from hive command line and run above command form Spark SQL program then i get error sojsuccessevents2_spark table not found. Hence i dropped the command from Hive and run create table sojsuccessevents2_spark from Spark SQL before running above command and it works until it hits next road block *dw_bid table not found* This makes me belive that Spark for some reason is not able to read/understand the tables created outside Spark. I did copy /apache/hive/conf/hive-site.xml into Spark conf directory. Please suggest. Regards, Deepak On Thu, Mar 26, 2015 at 1:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a hive table named dw_bid, when i run hive from command prompt and run describe dw_bid, it works. I want to join a avro file (table) in HDFS with this hive dw_bid table and i refer it as dw_bid from
Re: How to deploy binary dependencies to workers?
OK, after various testing, I found the native library can be loaded if running in yarn-cluster mode. But I still cannot find out why it won't load when running in yarn-client mode... Thanks, David On Thu, Mar 26, 2015 at 4:21 PM Xi Shen davidshe...@gmail.com wrote: Not of course...all machines in HDInsight are Windows 64bit server. And I have made sure all my DLLs are for 64bit machines. I have managed to get those DLLs loade on my local machine which is also Windows 64bit. [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen On Thu, Mar 26, 2015 at 11:11 AM, DB Tsai dbt...@dbtsai.com wrote: Are you deploying the windows dll to linux machine? Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Wed, Mar 25, 2015 at 3:57 AM, Xi Shen davidshe...@gmail.com wrote: I think you meant to use the --files to deploy the DLLs. I gave a try, but it did not work. From the Spark UI, Environment tab, I can see spark.yarn.dist.files file:/c:/openblas/libgcc_s_seh-1.dll,file:/c:/openblas/libblas3.dll,file:/c:/openblas/libgfortran-3.dll,file:/c:/openblas/liblapack3.dll,file:/c:/openblas/libquadmath-0.dll I think my DLLs are all deployed. But I still got the warn message that native BLAS library cannot be load. And idea? Thanks, David On Wed, Mar 25, 2015 at 5:40 AM DB Tsai dbt...@dbtsai.com wrote: I would recommend to upload those jars to HDFS, and use add jars option in spark-submit with URI from HDFS instead of URI from local filesystem. Thus, it can avoid the problem of fetching jars from driver which can be a bottleneck. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Tue, Mar 24, 2015 at 4:13 AM, Xi Shen davidshe...@gmail.com wrote: Hi, I am doing ML using Spark mllib. However, I do not have full control to the cluster. I am using Microsoft Azure HDInsight I want to deploy the BLAS or whatever required dependencies to accelerate the computation. But I don't know how to deploy those DLLs when I submit my JAR to the cluster. I know how to pack those DLLs into a jar. The real challenge is how to let the system find them... Thanks, David
Re: Write Parquet File with spark-streaming with Spark 1.3
You may resort to the generic save API introduced in 1.3, which supports appending as long as the target data source supports it. And in 1.3, Parquet does support appending. Cheng On 3/26/15 4:13 PM, Richard Grossman wrote: Hi I've succeed to write kafka stream to parquet file in Spark 1.2 but I can't make it with spark 1.3 As in streaming I can't use saveAsParquetFile() because I can't add data to an existing parquet File I know that it's possible to stream data directly into parquet could you help me by providing a little sample what API I need to use ?? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Windowing and Analytics Functions in Spark SQL
Ok, Thanks. Some web resource where I could check the functionality supported by Spark SQL? Thanks!!! Regards. Miguel Ángel. On Thu, Mar 26, 2015 at 12:31 PM, Cheng Lian lian.cs@gmail.com wrote: We're working together with AsiaInfo on this. Possibly will deliver an initial version of window function support in 1.4.0. But it's not a promise yet. Cheng On 3/26/15 7:27 PM, Arush Kharbanda wrote: Its not yet implemented. https://issues.apache.org/jira/browse/SPARK-1442 On Thu, Mar 26, 2015 at 4:39 PM, Masf masfwo...@gmail.com wrote: Hi. Are the Windowing and Analytics functions supported in Spark SQL (with HiveContext or not)? For example in Hive is supported https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics Some tutorial or documentation where I can see all features supported by Spark SQL? Thanks!!! -- Regards. Miguel Ángel -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Saludos. Miguel Ángel
Re: Windowing and Analytics Functions in Spark SQL
You can look at the Spark SQL programming guide. http://spark.apache.org/docs/1.3.0/sql-programming-guide.html and the Spark API. http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.package On Thu, Mar 26, 2015 at 5:21 PM, Masf masfwo...@gmail.com wrote: Ok, Thanks. Some web resource where I could check the functionality supported by Spark SQL? Thanks!!! Regards. Miguel Ángel. On Thu, Mar 26, 2015 at 12:31 PM, Cheng Lian lian.cs@gmail.com wrote: We're working together with AsiaInfo on this. Possibly will deliver an initial version of window function support in 1.4.0. But it's not a promise yet. Cheng On 3/26/15 7:27 PM, Arush Kharbanda wrote: Its not yet implemented. https://issues.apache.org/jira/browse/SPARK-1442 On Thu, Mar 26, 2015 at 4:39 PM, Masf masfwo...@gmail.com wrote: Hi. Are the Windowing and Analytics functions supported in Spark SQL (with HiveContext or not)? For example in Hive is supported https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics Some tutorial or documentation where I can see all features supported by Spark SQL? Thanks!!! -- Regards. Miguel Ángel -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Saludos. Miguel Ángel -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Why executor encourage OutOfMemoryException: Java heap space
Hi all, sometimes you can see OutOfMemoryException: Java heap space of executor in Spark. There many ideas about how to work arounds. My question is: how does executor execute tasks from the point of view of memory usage and parallelism? Picture in my mind is: Executor is JVM instance. Number of parallel tasks which can be executed in parallel threads inside single executor are contolled by --executor-cores param of submit-job in case of YARN. Each executor owns --executor-memory memory which is diveded in memory for RDD cache and memory for task execution. I don't consider caching topic now. It is very interesting to me how memory for task execution is used while work of executor. Let's consider an example when you have only map operations, no joins / group/ reduce and no caching. sc.textFile('test.txt') \ .map(lambda line: line.split()) \ .map(lambda item: int(item) + 10) \ .saveAsTextFile('out.txt') How the input RDD will be processed in this case? I know RDDs are divided in P partitions by some rules (for example by block size of HDFS). So we will have P partitions, P tasks and 1 stage (Am I right?). Let --executor-cores be 2. In this case executor will process two partitions in parallel. Will it try to load entire partitions in memory? Or will just call map chaines for each element of partitions? What can encourage OutOfMemoryException: Java heap space in this case?Large size of partition or large amount of memory to be eated by processing of single element of RDD? Please correct me and advise. Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-executor-encourage-OutOfMemoryException-Java-heap-space-tp22238.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hive Table not from from Spark SQL
I have this query insert overwrite table sojsuccessevents2_spark select guid,sessionKey,sessionStartDate,sojDataDate,seqNum,eventTimestamp,siteId,successEventType,sourceType,itemId, shopCartId,b.transaction_Id as transactionId,offerId,b.bdr_id as userId,priorPage1SeqNum,priorPage1PageId,exclWMSearchAttemptSeqNum,exclPriorSearchPageId, exclPriorSearchSeqNum,exclPriorSearchCategory,exclPriorSearchL1,exclPriorSearchL2,currentImpressionId,sourceImpressionId,exclPriorSearchSqr,exclPriorSearchSort, isDuplicate,b.bid_date as transactionDate,auctionTypeCode,isBin,leafCategoryId,itemSiteId,b.qty_bid as bidQuantity, b.bid_amt_unit_lstg_curncy * b.bid_exchng_rate as bidAmtUsd,offerQuantity,offerAmountUsd,offerCreateDate,buyerSegment,buyerCountryId,sellerId,sellerCountryId, sellerStdLevel,cssSellerLevel,a.experimentChannel from sojsuccessevents1 a *join dw_bid b* on a.itemId = b.item_id and a.transactionId = b.transaction_id where b.auct_end_dt = '2015-02-16' AND b.bid_dt = '2015-02-16' AND b.bid_type_code IN (1,9) AND b.bdr_id 0 AND ( b.bid_flags 32) = 0 and lower(a.successEventType) IN ('bid','bin') If i create sojsuccessevents2_spark from hive command line and run above command form Spark SQL program then i get error sojsuccessevents2_spark table not found. Hence i dropped the command from Hive and run create table sojsuccessevents2_spark from Spark SQL before running above command and it works until it hits next road block *dw_bid table not found* This makes me belive that Spark for some reason is not able to read/understand the tables created outside Spark. I did copy /apache/hive/conf/hive-site.xml into Spark conf directory. Please suggest. Regards, Deepak On Thu, Mar 26, 2015 at 1:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a hive table named dw_bid, when i run hive from command prompt and run describe dw_bid, it works. I want to join a avro file (table) in HDFS with this hive dw_bid table and i refer it as dw_bid from Spark SQL program, however i see 15/03/26 00:31:01 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_table : db=default tbl=dw_bid 15/03/26 00:31:01 ERROR metadata.Hive: NoSuchObjectException(message:default.dw_bid table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) Code: val successDetail_S1 = sqlContext.avroFile(input) successDetail_S1.registerTempTable(sojsuccessevents1) val countS1 = sqlContext.sql(select guid,sessionKey,sessionStartDate,sojDataDate,seqNum,eventTimestamp,siteId,successEventType,sourceType,itemId, + shopCartId,b.transaction_Id as transactionId,offerId,b.bdr_id as userId,priorPage1SeqNum,priorPage1PageId,exclWMSearchAttemptSeqNum,exclPriorSearchPageId, + exclPriorSearchSeqNum,exclPriorSearchCategory,exclPriorSearchL1,exclPriorSearchL2,currentImpressionId,sourceImpressionId,exclPriorSearchSqr,exclPriorSearchSort, + isDuplicate,b.bid_date as transactionDate,auctionTypeCode,isBin,leafCategoryId,itemSiteId,b.qty_bid as bidQuantity, + b.bid_amt_unit_lstg_curncy * b.bid_exchng_rate as bidAmtUsd,offerQuantity,offerAmountUsd,offerCreateDate,buyerSegment,buyerCountryId,sellerId,sellerCountryId, + sellerStdLevel,cssSellerLevel,a.experimentChannel + from sojsuccessevents1 a join dw_bid b + on a.itemId = b.item_id and a.transactionId = b.transaction_id + where b.bid_type_code IN (1,9) AND b.bdr_id 0 AND ( b.bid_flags 32) = 0 and lower(a.successEventType) IN ('bid','bin')) println(countS1.first: + countS1.first) Any suggestions on how to refer a hive table form Spark SQL? -- Deepak -- Deepak
Re: Windowing and Analytics Functions in Spark SQL
Its not yet implemented. https://issues.apache.org/jira/browse/SPARK-1442 On Thu, Mar 26, 2015 at 4:39 PM, Masf masfwo...@gmail.com wrote: Hi. Are the Windowing and Analytics functions supported in Spark SQL (with HiveContext or not)? For example in Hive is supported https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics Some tutorial or documentation where I can see all features supported by Spark SQL? Thanks!!! -- Regards. Miguel Ángel -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: SparkSQL overwrite parquet file does not generate _common_metadata
I couldn’t reproduce this with the following spark-shell snippet: |scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) | The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Port configuration for BlockManagerId
Hi, I am running spark-shell and connecting with a yarn cluster with deploy mode as client. In our environment, there are some security policies that doesn't allow us to open all TCP port. Issue I am facing is: Spark Shell driver is using a random port for BlockManagerID - BlockManagerId(driver, host-name, 52131). Is there any configuration I can use to fix this random port behavior? I am running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish
Windowing and Analytics Functions in Spark SQL
Hi. Are the Windowing and Analytics functions supported in Spark SQL (with HiveContext or not)? For example in Hive is supported https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics Some tutorial or documentation where I can see all features supported by Spark SQL? Thanks!!! -- Regards. Miguel Ángel
Re: Handling Big data for interactive BI tools
Yes, you can easily configure Spark Thrift server and connect BI Tools. Here's an example https://hadoopi.wordpress.com/2014/12/31/spark-connect-tableau-desktop-to-sparksql/ showing how to integrate SparkSQL with Tableau dashboards. Thanks Best Regards On Thu, Mar 26, 2015 at 3:56 PM, kundan kumar iitr.kun...@gmail.com wrote: Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Why k-means cluster hang for a long time?
Hi, When I run k-means cluster with Spark, I got this in the last two lines in the log: 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5 Then it hangs for a long time. There's no active job. The driver machine is idle. I cannot access the work node, I am not sure if they are busy. I understand k-means may take a long time to finish. But why no active job? no log? Thanks, David
Re: Windowing and Analytics Functions in Spark SQL
We're working together with AsiaInfo on this. Possibly will deliver an initial version of window function support in 1.4.0. But it's not a promise yet. Cheng On 3/26/15 7:27 PM, Arush Kharbanda wrote: Its not yet implemented. https://issues.apache.org/jira/browse/SPARK-1442 On Thu, Mar 26, 2015 at 4:39 PM, Masf masfwo...@gmail.com mailto:masfwo...@gmail.com wrote: Hi. Are the Windowing and Analytics functions supported in Spark SQL (with HiveContext or not)? For example in Hive is supported https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics Some tutorial or documentation where I can see all features supported by Spark SQL? Thanks!!! -- Regards. Miguel Ángel -- Sigmoid Analytics http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com mailto:ar...@sigmoidanalytics.com || www.sigmoidanalytics.com http://www.sigmoidanalytics.com/
Re: Handling Big data for interactive BI tools
You can also preaggregate results for the queries by the user - depending on what queries they use this might be necessary for any underlying technology Le 26 mars 2015 11:27, kundan kumar iitr.kun...@gmail.com a écrit : Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Populating a HashMap from a GraphX connectedComponents graph
The Scala code below was based on https://www.sics.se/~amir/files/download/dic/answers6.pdf. I extended it by adding a HashMap called componentLists that I populated with each component's starting node as the key and then a ListBuffer of the component's members. As the output below the code shows, it seems to do all that just fine, but then the HashMap size is back down to 0 when it's done (flag3), so I assume there's some scoping issue related to the use(s) of the case keyword. (I'm new to Scala and still don't completely understand that.) Can anyone tell me how to modify this so that I still have a populated componentLists when it's all done, i.e. when it reaches flag3? Thanks, Bob /// import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import scala.collection.mutable.ListBuffer import scala.collection.mutable.HashMap object problemDemo { def main(args: Array[String]) { val sc = new SparkContext(local, ProblemDemo, 127.0.0.1) val vertexArray = Array( (1L, Alice), (2L, Bob), (3L, Charlie), (4L, David), (5L, Ed), (6L, Fran) ) val edgeArray = Array( Edge(2L, 1L, knows), Edge(2L, 3L, knows), Edge(3L, 1L, knows), Edge(4L, 5L, knows), Edge(5L, 6L, knows) ) val vertexRDD: RDD[(Long, String)] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[String]] = sc.parallelize(edgeArray) val graph: Graph[String, String] = Graph(vertexRDD, edgeRDD) var componentLists = HashMap[VertexId, ListBuffer[VertexId]]() val cc = graph.connectedComponents graph.vertices.leftJoin(cc.vertices) { case (id, u, comp) = (id, u, comp) }.foreach{ case (id, u) = { // Add id to the list of components with a key // of u._3.get (the starting node) if (!(componentLists.contains(u._3.get))) { componentLists(u._3.get) = new ListBuffer[VertexId] } componentLists(u._3.get) += id println(sjust added ${id} to ${u._3.get}) println(sflag1 length of componentLists ${componentLists.size}) } println(sflag2 length of componentLists ${componentLists.size}) } println(sflag3 length of componentLists ${componentLists.size}) } } // output / just added 4 to 4 flag1 length of componentLists 1 flag2 length of componentLists 1 just added 2 to 1 flag1 length of componentLists 2 flag2 length of componentLists 2 just added 6 to 4 flag1 length of componentLists 2 flag2 length of componentLists 2 just added 5 to 4 flag1 length of componentLists 2 flag2 length of componentLists 2 just added 3 to 1 flag1 length of componentLists 2 flag2 length of componentLists 2 just added 1 to 1 flag1 length of componentLists 2 flag2 length of componentLists 2 flag3 length of componentLists 0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-1.3.0 UI shows 0 cores in completed applications tab
https://issues.apache.org/jira/browse/SPARK-5771 ? On Thu, Mar 26, 2015 at 12:58 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I started spark-shell in spark-1.3.0 and did some actions. The UI was showing 8 cores under the running applications tab. But when I exited the spark-shell using exit, the application is moved to completed applications tab and the number of cores is 0. Again when I exited the spark-shell using sc.stop() ,it is showing correctly 8 cores under completed applications tab. Why it is showing 0 cores when I didnt use sc.stop()?Does anyone face this issue? Thanks Regards, Meethu M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark-1.3.0 UI shows 0 cores in completed applications tab
Hi all, I started spark-shell in spark-1.3.0 and did some actions. The UI was showing 8 cores under the running applications tab. But when I exited the spark-shell using exit, the application is moved to completed applications tab and the number of cores is 0. Again when I exited the spark-shell using sc.stop() ,it is showing correctly 8 cores under completed applications tab. Why it is showing 0 cores when I didnt use sc.stop()?Does anyone face this issue? Thanks Regards, Meethu M
Re: Spark-core and guava
This is a long and complicated story. In short, Spark shades Guava 14 except for a few classes that were accidentally used in a public API (Optional and a few more it depends on). So provided is more of a Maven workaround to achieve a desired effect. It's not provided in the usual sense. On Thu, Mar 26, 2015 at 12:24 PM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Spark community, spark-core 1.3.0 has guava 14.0.1 as provided dependency (see http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.0/spark-core_2.10-1.3.0.pom ) What is supposed to provide guava, and that specific version? Kind regards, Stevo Slavic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Which RDD operations preserve ordering?
Hi guys, I don't have exact picture about preserving of ordering of elements of RDD after executing of operations. Which operations preserve it? 1) Map (Yes?) 2) ZipWithIndex (Yes or sometimes yes?) Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-RDD-operations-preserve-ordering-tp22239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark-core and guava
Hello Apache Spark community, spark-core 1.3.0 has guava 14.0.1 as provided dependency (see http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.0/spark-core_2.10-1.3.0.pom ) What is supposed to provide guava, and that specific version? Kind regards, Stevo Slavic.
Re: Handling Big data for interactive BI tools
I looking for some options and came across http://www.jethrodata.com/ On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke jornfra...@gmail.com wrote: You can also preaggregate results for the queries by the user - depending on what queries they use this might be necessary for any underlying technology Le 26 mars 2015 11:27, kundan kumar iitr.kun...@gmail.com a écrit : Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Re: Handling Big data for interactive BI tools
I was looking for some options and came across JethroData. http://www.jethrodata.com/ This stores the data maintaining indexes over all the columns seems good and claims to have better performance than Impala. Earlier I had tried Apache Phoenix because of its secondary indexing feature. But the major challenge I faced there was, secondary indexing was not supported for bulk loading process. Only the sequential loading process supported the secondary indexes, which took longer time. Any comments on this ? On Thu, Mar 26, 2015 at 5:59 PM, kundan kumar iitr.kun...@gmail.com wrote: I looking for some options and came across http://www.jethrodata.com/ On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke jornfra...@gmail.com wrote: You can also preaggregate results for the queries by the user - depending on what queries they use this might be necessary for any underlying technology Le 26 mars 2015 11:27, kundan kumar iitr.kun...@gmail.com a écrit : Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Re: How to get a top X percent of a distribution represented as RDD
You can do it in-memory as wellget 10% topK elements from each partition and use merge from any sort algorithm like timsortbasically aggregateBy Your version uses shuffle but this version is 0 shuffle..assuming your data set is cached you will be using in-memory allReduce through treeAggregate... But this is only good for top 10% or bottom 10%...if you need to do it for top 30% then may be the shuffle version will work better... On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet aung@gmail.com wrote: Hi all, I have a distribution represented as an RDD of tuples, in rows of (segment, score) For each segment, I want to discard tuples with top X percent scores. This seems hard to do in Spark RDD. A naive algorithm would be - 1) Sort RDD by segment score (descending) 2) Within each segment, number the rows from top to bottom. 3) For each segment, calculate the cut off index. i.e. 90 for 10% cut off out of a segment with 100 rows. 4) For the entire RDD, filter rows with row num = cut off index This does not look like a good algorithm. I would really appreciate if someone can suggest a better way to implement this in Spark. Regards, Aung
Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException
Hey Deepak, It seems that your hive-site.xml says your Hive metastore setup is using MySQL. If that's not the case, you need to adjust your hive-site.xml configurations. As for the version of MySQL driver, it should match the MySQL server. Cheng On 3/27/15 11:07 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: I do not use MySQL, i want to read Hive tables from Spark SQL and transform them in Spark SQL. Why do i need a MySQL driver ? If i still need it which version should i use. Assuming i need it, i downloaded the latest version of it from http://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.34 and ran the following commands, i do not see above exception , however i see a new one. export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:*/home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar* export HADOOP_CONF_DIR=/apache/hadoop/conf cd $SPARK_HOME ./bin/spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath ... ... spark-sql spark-sql spark-sql show tables; 15/03/26 20:03:57 INFO metastore.HiveMetaStore: 0: get_tables: db=default pat=.* 15/03/26 20:03:57 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com mailto:dvasthi...@corp.ebay.comip=unknown-ip-addrcmd=get_tables: db=default pat=.* 15/03/26 20:03:58 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:83 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Got job 1 (collect at SparkPlan.scala:83) with 1 output partitions (allowLocal=false) 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Final stage: Stage 1(collect at SparkPlan.scala:83) 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Missing parents: List() 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[3] at map at SparkPlan.scala:83), which has no missing parents 15/03/26 20:03:58 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 15/03/26 20:03:58 INFO scheduler.StatsReportListener: Finished stage: org.apache.spark.scheduler.StageInfo@2bfd9c4d 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Job 1 failed: collect at SparkPlan.scala:83, took 0.005163 s 15/03/26 20:03:58 ERROR thriftserver.SparkSQLDriver: Failed in [show tables] org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:526) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) org.apache.spark.broadcast.TorrentBroadcast.org http://org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:79) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839) org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException
I am unable to run spark-sql form command line. I attempted the following 1) export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar cd $SPARK_HOME ./bin/spark-sql ./bin/spark-sql 2) export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar cd $SPARK_HOME ./bin/spark-sql --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar 3) export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar export HADOOP_CONF_DIR=/apache/hadoop/conf cd $SPARK_HOME ./bin/spark-sql *Each time i get the below exception* Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/03/26 19:43:49 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/03/26 19:43:49 INFO metastore.ObjectStore: ObjectStore, initialize called 15/03/26 19:43:50 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/03/26 19:43:50 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 11 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at
SparkContext.wholeTextFiles throws not serializable exception
Hi, I want to load my data in this way: sc.wholeTextFiles(opt.input) map { x = (x._1, x._2.lines.filter(!_.isEmpty).toSeq) } But I got java.io.NotSerializableException: scala.collection.Iterator$$anon$13 But if I use x._2.split('\n'), I can get the expected result. I want to know what's wrong with using the lines() function. Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Re: What is best way to run spark job in yarn-cluster mode from java program(servlet container) and NOT using spark-submit command.
Sandy Ryza sandy.r...@cloudera.com writes: Creating a SparkContext and setting master as yarn-cluster unfortunately will not work. SPARK-4924 added APIs for doing this in Spark, but won't be included until 1.4. -Sandy Did you look into something like [1]? With that you can make rest API call from your java code. Thanks and Regards Noorul [1] https://github.com/spark-jobserver/spark-jobserver? On Tue, Mar 17, 2015 at 3:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Create SparkContext set master as yarn-cluster then run it as a standalone program? Thanks Best Regards On Tue, Mar 17, 2015 at 1:27 AM, rrussell25 rrussel...@gmail.com wrote: Hi, were you ever able to determine a satisfactory approach for this problem? I have a similar situation and would prefer to execute the job directly from java code within my jms listener and/or servlet container. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-best-way-to-run-spark-job-in-yarn-cluster-mode-from-java-program-servlet-container-and-NOT-u-tp21817p22086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to get a top X percent of a distribution represented as RDD
Idea is to use a heap and get topK elements from every partition...then use aggregateBy and for combOp do a merge routine from mergeSort...basically get 100 items from partition 1, 100 items from partition 2, merge them so that you get sorted 200 items and take 100...for merge you can use heap as well...Matei had a BPQ inside Spark which we use all the time...Passing arrays over wire is better than passing full heap objects and merge routine on array should run faster but needs experiment... On Thu, Mar 26, 2015 at 9:26 PM, Aung Htet aung@gmail.com wrote: Hi Debasish, Thanks for your suggestions. In-memory version is quite useful. I do not quite understand how you can use aggregateBy to get 10% top K elements. Can you please give an example? Thanks, Aung On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das debasish.da...@gmail.com wrote: You can do it in-memory as wellget 10% topK elements from each partition and use merge from any sort algorithm like timsortbasically aggregateBy Your version uses shuffle but this version is 0 shuffle..assuming your data set is cached you will be using in-memory allReduce through treeAggregate... But this is only good for top 10% or bottom 10%...if you need to do it for top 30% then may be the shuffle version will work better... On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet aung@gmail.com wrote: Hi all, I have a distribution represented as an RDD of tuples, in rows of (segment, score) For each segment, I want to discard tuples with top X percent scores. This seems hard to do in Spark RDD. A naive algorithm would be - 1) Sort RDD by segment score (descending) 2) Within each segment, number the rows from top to bottom. 3) For each segment, calculate the cut off index. i.e. 90 for 10% cut off out of a segment with 100 rows. 4) For the entire RDD, filter rows with row num = cut off index This does not look like a good algorithm. I would really appreciate if someone can suggest a better way to implement this in Spark. Regards, Aung
Spark SQL configurations
Hello, Can someone share me the list of commands (including export statements) that you use to run Spark SQL over YARN cluster. I am unable to get it running on my YARN cluster and running into exceptions. I understand i need to share specific exception. This is more like i want to know if i have not missed out anything before running Spark SQL. Regards, Deepak
k-means can only run on one executor with one thread?
Hi, I have a large data set, and I expects to get 5000 clusters. I load the raw data, convert them into DenseVector; then I did repartition and cache; finally I give the RDD[Vector] to KMeans.train(). Now the job is running, and data are loaded. But according to the Spark UI, all data are loaded onto one executor. I checked that executor, and its CPU workload is very low. I think it is using only 1 of the 8 cores. And all other 3 executors are at rest. Did I miss something? Is it possible to distribute the workload to all 4 executors? Thanks, David
Re: Cross-compatibility of YARN shuffle service
Hi Matt, I'm not sure whether we have documented compatibility guidelines here. However, a strong goal is to keep the external shuffle service compatible so that many versions of Spark can run against the same shuffle service. -Sandy On Wed, Mar 25, 2015 at 6:44 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I am considering moving from Spark-Standalone to YARN. The context is that there are multiple Spark applications that are using different versions of Spark that all want to use the same YARN cluster. My question is: if I use a single Spark YARN shuffle service jar on the Node Manager, will the service work properly with all of the Spark applications, regardless of the specific versions of the applications? Or, is it it the case that, if I want to use the external shuffle service, I need to have all of my applications using the same version of Spark? Thanks, -Matt Cheah
Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException
I do not use MySQL, i want to read Hive tables from Spark SQL and transform them in Spark SQL. Why do i need a MySQL driver ? If i still need it which version should i use. Assuming i need it, i downloaded the latest version of it from http://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.34 and ran the following commands, i do not see above exception , however i see a new one. export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar: */home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar* export HADOOP_CONF_DIR=/apache/hadoop/conf cd $SPARK_HOME ./bin/spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath ... ... spark-sql spark-sql spark-sql show tables; 15/03/26 20:03:57 INFO metastore.HiveMetaStore: 0: get_tables: db=default pat=.* 15/03/26 20:03:57 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com ip=unknown-ip-addr cmd=get_tables: db=default pat=.* 15/03/26 20:03:58 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:83 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Got job 1 (collect at SparkPlan.scala:83) with 1 output partitions (allowLocal=false) 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Final stage: Stage 1(collect at SparkPlan.scala:83) 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Missing parents: List() 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[3] at map at SparkPlan.scala:83), which has no missing parents 15/03/26 20:03:58 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 15/03/26 20:03:58 INFO scheduler.StatsReportListener: Finished stage: org.apache.spark.scheduler.StageInfo@2bfd9c4d 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Job 1 failed: collect at SparkPlan.scala:83, took 0.005163 s 15/03/26 20:03:58 ERROR thriftserver.SparkSQLDriver: Failed in [show tables] org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:526) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) org.apache.spark.broadcast.TorrentBroadcast.org $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:79) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839) org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847) at
Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException
If you're not using MySQL as your metastore for Hive, out of curiosity what are you using? The error you are seeing is common when there isn't the correct driver to allow Spark to connect to the Hive metastore because the correct driver isn't there. As well, I noticed that you're using SPARK_CLASSPATH which has been deprecated. Depending on your scenario, you may want to use --jars, --driver-class-path, or extraClassPath. A good thread on this topic can be found at http://mail-archives.us.apache.org/mod_mbox/spark-user/201503.mbox/%3C01a901d0547c$a23ba480$e6b2ed80$@innowireless.com%3E . For example, when I connect to my own Hive metastore via Spark 1.3, I reference the --driver-class-path where in my case I am using MySQL as my Hive metastore: ./bin/spark-sql --master spark://$standalone$:7077 --driver-class-path mysql-connector-$version$.jar HTH! On Thu, Mar 26, 2015 at 8:09 PM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I do not use MySQL, i want to read Hive tables from Spark SQL and transform them in Spark SQL. Why do i need a MySQL driver ? If i still need it which version should i use. Assuming i need it, i downloaded the latest version of it from http://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.34 and ran the following commands, i do not see above exception , however i see a new one. export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar: */home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar* export HADOOP_CONF_DIR=/apache/hadoop/conf cd $SPARK_HOME ./bin/spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath ... ... spark-sql spark-sql spark-sql show tables; 15/03/26 20:03:57 INFO metastore.HiveMetaStore: 0: get_tables: db=default pat=.* 15/03/26 20:03:57 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com ip=unknown-ip-addr cmd=get_tables: db=default pat=.* 15/03/26 20:03:58 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:83 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Got job 1 (collect at SparkPlan.scala:83) with 1 output partitions (allowLocal=false) 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Final stage: Stage 1(collect at SparkPlan.scala:83) 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Missing parents: List() 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[3] at map at SparkPlan.scala:83), which has no missing parents 15/03/26 20:03:58 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 15/03/26 20:03:58 INFO scheduler.StatsReportListener: Finished stage: org.apache.spark.scheduler.StageInfo@2bfd9c4d 15/03/26 20:03:58 INFO scheduler.DAGScheduler: Job 1 failed: collect at SparkPlan.scala:83, took 0.005163 s 15/03/26 20:03:58 ERROR thriftserver.SparkSQLDriver: Failed in [show tables] org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:526) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) org.apache.spark.broadcast.TorrentBroadcast.org $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:79) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839) org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException
As the exception suggests, you don't have MySQL JDBC driver on your classpath. On 3/27/15 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: I am unable to run spark-sql form command line. I attempted the following 1) export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar cd $SPARK_HOME ./bin/spark-sql ./bin/spark-sql 2) export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar cd $SPARK_HOME ./bin/spark-sql --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar 3) export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar export HADOOP_CONF_DIR=/apache/hadoop/conf cd $SPARK_HOME ./bin/spark-sql _Each time i get the below exception_ Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 15/03/26 19:43:49 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/03/26 19:43:49 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/03/26 19:43:49 INFO metastore.ObjectStore: ObjectStore, initialize called 15/03/26 19:43:50 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/03/26 19:43:50 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 11 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at
Re: SparkSQL overwrite parquet file does not generate _common_metadata
Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode. Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: SparkContext.wholeTextFiles throws not serializable exception
I have to use .lines.toArray.toSeq A little tricky. [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen On Fri, Mar 27, 2015 at 4:41 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I want to load my data in this way: sc.wholeTextFiles(opt.input) map { x = (x._1, x._2.lines.filter(!_.isEmpty).toSeq) } But I got java.io.NotSerializableException: scala.collection.Iterator$$anon$13 But if I use x._2.split('\n'), I can get the expected result. I want to know what's wrong with using the lines() function. Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Re: Combining Many RDDs
Yang Chen y...@yang-cs.com writes: Hi Noorul, Thank you for your suggestion. I tried that, but ran out of memory. I did some search and found some suggestions that we should try to avoid rdd.union( http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark ). I will try to come up with some other ways. I think you are using rdd.union(), but I was referring to SparkContext.union(). I am not sure about the number of RDDs that you have but I had no issues with memory when I used it to combine 2000 RDDs. Having said that I had other performance issues with spark cassandra connector. Thanks and Regards Noorul On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M noo...@noorul.com wrote: sparkx y...@yang-cs.com writes: Hi, I have a Spark job and a dataset of 0.5 Million items. Each item performs some sort of computation (joining a shared external dataset, if that does matter) and produces an RDD containing 20-500 result items. Now I would like to combine all these RDDs and perform a next job. What I have found out is that the computation itself is quite fast, but combining these RDDs takes much longer time. val result = data// 0.5M data items .map(compute(_)) // Produces an RDD - fast .reduce(_ ++ _) // Combining RDDs - slow I have also tried to collect results from compute(_) and use a flatMap, but that is also slow. Is there a way to efficiently do this? I'm thinking about writing this result to HDFS and reading from disk for the next job, but am not sure if that's a preferred way in Spark. Are you looking for SparkContext.union() [1] ? This is not performing well with spark cassandra connector. I am not sure whether this will help you. Thanks and Regards Noorul [1] http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get a top X percent of a distribution represented as RDD
Hi all, I have a distribution represented as an RDD of tuples, in rows of (segment, score) For each segment, I want to discard tuples with top X percent scores. This seems hard to do in Spark RDD. A naive algorithm would be - 1) Sort RDD by segment score (descending) 2) Within each segment, number the rows from top to bottom. 3) For each segment, calculate the cut off index. i.e. 90 for 10% cut off out of a segment with 100 rows. 4) For the entire RDD, filter rows with row num = cut off index This does not look like a good algorithm. I would really appreciate if someone can suggest a better way to implement this in Spark. Regards, Aung
Re: How to get a top X percent of a distribution represented as RDD
Hi Debasish, Thanks for your suggestions. In-memory version is quite useful. I do not quite understand how you can use aggregateBy to get 10% top K elements. Can you please give an example? Thanks, Aung On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das debasish.da...@gmail.com wrote: You can do it in-memory as wellget 10% topK elements from each partition and use merge from any sort algorithm like timsortbasically aggregateBy Your version uses shuffle but this version is 0 shuffle..assuming your data set is cached you will be using in-memory allReduce through treeAggregate... But this is only good for top 10% or bottom 10%...if you need to do it for top 30% then may be the shuffle version will work better... On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet aung@gmail.com wrote: Hi all, I have a distribution represented as an RDD of tuples, in rows of (segment, score) For each segment, I want to discard tuples with top X percent scores. This seems hard to do in Spark RDD. A naive algorithm would be - 1) Sort RDD by segment score (descending) 2) Within each segment, number the rows from top to bottom. 3) For each segment, calculate the cut off index. i.e. 90 for 10% cut off out of a segment with 100 rows. 4) For the entire RDD, filter rows with row num = cut off index This does not look like a good algorithm. I would really appreciate if someone can suggest a better way to implement this in Spark. Regards, Aung
Re: How to get a top X percent of a distribution represented as RDD
?You could also consider using a count-min data structure such as in https://github.com/laserson/dsq? to get approximate quantiles, then use whatever values you want to filter the original sequence. From: Debasish Das debasish.da...@gmail.com Sent: Thursday, March 26, 2015 9:45 PM To: Aung Htet Cc: user Subject: Re: How to get a top X percent of a distribution represented as RDD Idea is to use a heap and get topK elements from every partition...then use aggregateBy and for combOp do a merge routine from mergeSort...basically get 100 items from partition 1, 100 items from partition 2, merge them so that you get sorted 200 items and take 100...for merge you can use heap as well...Matei had a BPQ inside Spark which we use all the time...Passing arrays over wire is better than passing full heap objects and merge routine on array should run faster but needs experiment... On Thu, Mar 26, 2015 at 9:26 PM, Aung Htet aung@gmail.commailto:aung@gmail.com wrote: Hi Debasish, Thanks for your suggestions. In-memory version is quite useful. I do not quite understand how you can use aggregateBy to get 10% top K elements. Can you please give an example? Thanks, Aung On Fri, Mar 27, 2015 at 2:40 PM, Debasish Das debasish.da...@gmail.commailto:debasish.da...@gmail.com wrote: You can do it in-memory as wellget 10% topK elements from each partition and use merge from any sort algorithm like timsortbasically aggregateBy Your version uses shuffle but this version is 0 shuffle..assuming your data set is cached you will be using in-memory allReduce through treeAggregate... But this is only good for top 10% or bottom 10%...if you need to do it for top 30% then may be the shuffle version will work better... On Thu, Mar 26, 2015 at 8:31 PM, Aung Htet aung@gmail.commailto:aung@gmail.com wrote: Hi all, I have a distribution represented as an RDD of tuples, in rows of (segment, score) For each segment, I want to discard tuples with top X percent scores. This seems hard to do in Spark RDD. A naive algorithm would be - 1) Sort RDD by segment score (descending) 2) Within each segment, number the rows from top to bottom. 3) For each segment, calculate the cut off index. i.e. 90 for 10% cut off out of a segment with 100 rows. 4) For the entire RDD, filter rows with row num = cut off index This does not look like a good algorithm. I would really appreciate if someone can suggest a better way to implement this in Spark. Regards, Aung
DataFrame GroupBy
Hello everybody, I am trying to do a simple groupBy : *Code:* val df = hiveContext.sql(SELECT * FROM table1) df .printSchema() df .groupBy(customer_id).count().show(5) *Stacktrace* : root |-- customer_id: string (nullable = true) |-- rank: string (nullable = true) |-- reco_material_id: string (nullable = true) |-- score: string (nullable = true) |-- category: string (nullable = true) |-- is_achat: string (nullable = true) 15/03/26 17:19:29 INFO HiveMetaStore: 0: get_table : db=default tbl=table1 15/03/26 17:19:29 INFO audit: ugi=spark ip=unknown-ip-addr cmd=get_table : db=default tbl=table1 Exception in thread main java.util.NoSuchElementException: key not found: customer_id#0 at scala.collection.MapLike$class.default(MapLike.scala:228) at org.apache.spark.sql.catalyst.expressions.AttributeMap.default(AttributeMap.scala:29) at scala.collection.MapLike$class.apply(MapLike.scala:141) at org.apache.spark.sql.catalyst.expressions.AttributeMap.apply(AttributeMap.scala:29) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:53) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:1034) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:212) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:152) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:290) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:1081) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:1079) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1085) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1085) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:758) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:809) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314) Does anyone have an idea? Regards, Germain Tanguy. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-GroupBy-tp22242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HQL function Rollup and Cube
Clarification on how the HQL was invoked: hiveContext.sql(select a, b, count(*) from t group by a, b with rollup) Thanks, Chang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241p22244.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Combining Many RDDs
sparkx y...@yang-cs.com writes: Hi, I have a Spark job and a dataset of 0.5 Million items. Each item performs some sort of computation (joining a shared external dataset, if that does matter) and produces an RDD containing 20-500 result items. Now I would like to combine all these RDDs and perform a next job. What I have found out is that the computation itself is quite fast, but combining these RDDs takes much longer time. val result = data// 0.5M data items .map(compute(_)) // Produces an RDD - fast .reduce(_ ++ _) // Combining RDDs - slow I have also tried to collect results from compute(_) and use a flatMap, but that is also slow. Is there a way to efficiently do this? I'm thinking about writing this result to HDFS and reading from disk for the next job, but am not sure if that's a preferred way in Spark. Are you looking for SparkContext.union() [1] ? This is not performing well with spark cassandra connector. I am not sure whether this will help you. Thanks and Regards Noorul [1] http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Handling Big data for interactive BI tools
BTW, a tool that I have been using to help do the preaggregation of data using hyperloglog in combination with Spark is atscale (http://atscale.com/). It builds the aggregations and makes use of the speed of SparkSQL - all within the context of a model that is accessible by Tableau or Qlik. On Thu, Mar 26, 2015 at 8:55 AM Jörn Franke jornfra...@gmail.com wrote: As I wrote previously - indexing is not your only choice, you can preaggregate data during load or depending on your needs you need to think about other data structures, such as graphs, hyperloglog, bloom filters etc. (challenge to integrate in standard bi tools) Le 26 mars 2015 13:34, kundan kumar iitr.kun...@gmail.com a écrit : I was looking for some options and came across JethroData. http://www.jethrodata.com/ This stores the data maintaining indexes over all the columns seems good and claims to have better performance than Impala. Earlier I had tried Apache Phoenix because of its secondary indexing feature. But the major challenge I faced there was, secondary indexing was not supported for bulk loading process. Only the sequential loading process supported the secondary indexes, which took longer time. Any comments on this ? On Thu, Mar 26, 2015 at 5:59 PM, kundan kumar iitr.kun...@gmail.com wrote: I looking for some options and came across http://www.jethrodata.com/ On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke jornfra...@gmail.com wrote: You can also preaggregate results for the queries by the user - depending on what queries they use this might be necessary for any underlying technology Le 26 mars 2015 11:27, kundan kumar iitr.kun...@gmail.com a écrit : Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Re: Spark-core and guava
Thanks for heads up Sean! On Mar 26, 2015 1:30 PM, Sean Owen so...@cloudera.com wrote: This is a long and complicated story. In short, Spark shades Guava 14 except for a few classes that were accidentally used in a public API (Optional and a few more it depends on). So provided is more of a Maven workaround to achieve a desired effect. It's not provided in the usual sense. On Thu, Mar 26, 2015 at 12:24 PM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Spark community, spark-core 1.3.0 has guava 14.0.1 as provided dependency (see http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.3.0/spark-core_2.10-1.3.0.pom ) What is supposed to provide guava, and that specific version? Kind regards, Stevo Slavic.
Re: How to get rdd count() without double evaluation of the RDD?
You can also always take the more extreme approach of using SparkContext#runJob (or submitJob) to write a custom Action that does what you want in one pass. Usually that's not worth the extra effort. On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen so...@cloudera.com wrote: To avoid computing twice you need to persist the RDD but that need not be in memory. You can persist to disk with persist(). On Mar 26, 2015 4:11 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a rdd that is expensive to compute. I want to save it as object file and also print the count. How can I avoid double computation of the RDD? val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) val count = rdd.count() // this force computation of the rdd println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again I can avoid double computation by using cache val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) rdd.cache() val count = rdd.count() println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again This only compute rdd once. However the rdd has millions of items and will cause out of memory. Question: how can I avoid double computation without using cache? Ningjun
How to get rdd count() without double evaluation of the RDD?
I have a rdd that is expensive to compute. I want to save it as object file and also print the count. How can I avoid double computation of the RDD? val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) val count = rdd.count() // this force computation of the rdd println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again I can avoid double computation by using cache val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) rdd.cache() val count = rdd.count() println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again This only compute rdd once. However the rdd has millions of items and will cause out of memory. Question: how can I avoid double computation without using cache? Ningjun
Re: python : Out of memory: Kill process
the last try was without log2.cache() and still getting out of memory I using the following conf, maybe help: conf = (SparkConf() .setAppName(LoadS3) .set(spark.executor.memory, 13g) .set(spark.driver.memory, 13g) .set(spark.driver.maxResultSize,2g) .set(spark.default.parallelism,200) .set(spark.kryoserializer.buffer.mb,512)) sc = SparkContext(conf=conf ) sqlContext = SQLContext(sc) On Thu, Mar 26, 2015 at 2:29 PM, Davies Liu dav...@databricks.com wrote: Could you try to remove the line `log2.cache()` ? On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: I running on ec2 : 1 Master : 4 CPU 15 GB RAM (2 GB swap) 2 Slaves 4 CPU 15 GB RAM the uncompressed dataset size is 15 GB On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory. I ran the same code as before, I need to make any changes? On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu dav...@databricks.com wrote: With batchSize = 1, I think it will become even worse. I'd suggest to go with 1.3, have a taste for the new DataFrame API. On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I running 1.1.0. Now I'm following this thread that recommend use batchsize parameter = 1 http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html if this does not work I will install 1.2.1 or 1.3 Regards On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu dav...@databricks.com wrote: What's the version of Spark you are running? There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3, [1] https://issues.apache.org/jira/browse/SPARK-6055 On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Guys, I running the following function with spark-submmit and de SO is killing my process : def getRdd(self,date,provider): path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz' log2= self.sqlContext.jsonFile(path) log2.registerTempTable('log_test') log2.cache() You only visit the table once, cache does not help here. out=self.sqlContext.sql(SELECT user, tax from log_test where provider = '+provider+'and country '').map(lambda row: (row.user, row.tax)) print out1 return map((lambda (x,y): (x, list(y))), sorted(out.groupByKey(2000).collect())) 100 partitions (or less) will be enough for 2G dataset. The input dataset has 57 zip files (2 GB) The same process with a smaller dataset completed successfully Any ideas to debug is welcome. Regards Eduardo
Re: How to get rdd count() without double evaluation of the RDD?
To avoid computing twice you need to persist the RDD but that need not be in memory. You can persist to disk with persist(). On Mar 26, 2015 4:11 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a rdd that is expensive to compute. I want to save it as object file and also print the count. How can I avoid double computation of the RDD? val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) val count = rdd.count() // this force computation of the rdd println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again I can avoid double computation by using cache val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) rdd.cache() val count = rdd.count() println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again This only compute rdd once. However the rdd has millions of items and will cause out of memory. Question: how can I avoid double computation without using cache? Ningjun
Combining Many RDDs
Hi, I have a Spark job and a dataset of 0.5 Million items. Each item performs some sort of computation (joining a shared external dataset, if that does matter) and produces an RDD containing 20-500 result items. Now I would like to combine all these RDDs and perform a next job. What I have found out is that the computation itself is quite fast, but combining these RDDs takes much longer time. val result = data// 0.5M data items .map(compute(_)) // Produces an RDD - fast .reduce(_ ++ _) // Combining RDDs - slow I have also tried to collect results from compute(_) and use a flatMap, but that is also slow. Is there a way to efficiently do this? I'm thinking about writing this result to HDFS and reading from disk for the next job, but am not sure if that's a preferred way in Spark. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Combining-Many-RDDs-tp22243.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HQL function Rollup and Cube
Has anyone been able to use Hive 0.13 ROLLUP and CUBE functions in Spark 1.3's Hive Context? According to https://issues.apache.org/jira/browse/SPARK-2663, this has been resolved in Spark 1.3. I created an in-memory temp table (t) and tried to execute a ROLLUP(and CUBE) function: select a, b, count(*) from t group by a, b with rollup Got the error that with rollup is an invalid function. Am I missing something? Thanks, Chang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What his the ideal method to interact with Spark Cluster from a Cloud App?
Today I found one answer from a this thread [1] which seems to be worth exploring. Michael, if you are reading this, it will be helpful if you could share more about your spark deployment in production. Thanks and Regards Noorul [1] http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-run-your-spark-app-tp7935p7958.html Noorul Islam K M noo...@noorul.com writes: Hi all, We have a cloud application, to which we are adding a reporting service. For this we have narrowed down to use Cassandra + Spark for data store and processing respectively. Since cloud application is separate from Cassandra + Spark deployment, what is ideal method to interact with Spark Master from the application? We have been evaluating spark-job-server [1], which is an RESTful layer on top of Spark. Are there any other such tools? Or are there any other better approach which can be explored? We are evaluating following requirements against spark-job-server, 1. Provide a platform for applications to submit jobs 2. Provide RESTful APIs using which applications will interact with the server - Upload jar for running jobs - Submit job - Get job list - Get job status - Get job result 3. Provide support for kill/restart job - Kill job - Restart job 4. Support job priority 5. Queue up job submissions if resources not available 6. Troubleshoot job execution - Failure – job logs - Measure performance 7. Manage cluster deployment - Bootstrap, scale up/down (add, remove, replace nodes) 8. Monitor cluster deployment - Health report: Report metrics – CPU, Memory, - of jobs, spark processes - Alert DevOps about threshold limit of these metrics - Alert DevOps about job failures - Self healing? 9. Security - AAA job submissions 10. High availability/Redundancy - This is for the spark-jobserver component itself Any help is appreciated! Thanks and Regards Noorul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: python : Out of memory: Kill process
I running on ec2 : 1 Master : 4 CPU 15 GB RAM (2 GB swap) 2 Slaves 4 CPU 15 GB RAM the uncompressed dataset size is 15 GB On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory. I ran the same code as before, I need to make any changes? On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu dav...@databricks.com wrote: With batchSize = 1, I think it will become even worse. I'd suggest to go with 1.3, have a taste for the new DataFrame API. On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I running 1.1.0. Now I'm following this thread that recommend use batchsize parameter = 1 http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html if this does not work I will install 1.2.1 or 1.3 Regards On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu dav...@databricks.com wrote: What's the version of Spark you are running? There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3, [1] https://issues.apache.org/jira/browse/SPARK-6055 On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Guys, I running the following function with spark-submmit and de SO is killing my process : def getRdd(self,date,provider): path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz' log2= self.sqlContext.jsonFile(path) log2.registerTempTable('log_test') log2.cache() You only visit the table once, cache does not help here. out=self.sqlContext.sql(SELECT user, tax from log_test where provider = '+provider+'and country '').map(lambda row: (row.user, row.tax)) print out1 return map((lambda (x,y): (x, list(y))), sorted(out.groupByKey(2000).collect())) 100 partitions (or less) will be enough for 2G dataset. The input dataset has 57 zip files (2 GB) The same process with a smaller dataset completed successfully Any ideas to debug is welcome. Regards Eduardo
Re: python : Out of memory: Kill process
Could you try to remove the line `log2.cache()` ? On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: I running on ec2 : 1 Master : 4 CPU 15 GB RAM (2 GB swap) 2 Slaves 4 CPU 15 GB RAM the uncompressed dataset size is 15 GB On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory. I ran the same code as before, I need to make any changes? On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu dav...@databricks.com wrote: With batchSize = 1, I think it will become even worse. I'd suggest to go with 1.3, have a taste for the new DataFrame API. On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I running 1.1.0. Now I'm following this thread that recommend use batchsize parameter = 1 http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html if this does not work I will install 1.2.1 or 1.3 Regards On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu dav...@databricks.com wrote: What's the version of Spark you are running? There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3, [1] https://issues.apache.org/jira/browse/SPARK-6055 On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Guys, I running the following function with spark-submmit and de SO is killing my process : def getRdd(self,date,provider): path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz' log2= self.sqlContext.jsonFile(path) log2.registerTempTable('log_test') log2.cache() You only visit the table once, cache does not help here. out=self.sqlContext.sql(SELECT user, tax from log_test where provider = '+provider+'and country '').map(lambda row: (row.user, row.tax)) print out1 return map((lambda (x,y): (x, list(y))), sorted(out.groupByKey(2000).collect())) 100 partitions (or less) will be enough for 2G dataset. The input dataset has 57 zip files (2 GB) The same process with a smaller dataset completed successfully Any ideas to debug is welcome. Regards Eduardo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iPython Notebook + Spark + Accumulo -- best practice?
I'm guessing the Accumulo Key and Value classes are not serializable, so you would need to do something like val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) = (extractScalaType(key), extractScalaType(value)) } Where 'extractScalaType converts the key or Value to a standard Scala type or case class or whatever - basically extracts the data from the Key or Value in a form usable in Scala — Sent from Mailbox On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com wrote: Hi, David, This is the code that I use to create a JavaPairRDD from an Accumulo table: JavaSparkContext sc = new JavaSparkContext(conf); Job hadoopJob = Job.getInstance(conf,TestSparkJob); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setZooKeeperInstance(job, conf.get(ZOOKEEPER_INSTANCE_NAME, conf.get(ZOOKEEPER_HOSTS) ); AccumuloInputFormat.setConnectorInfo(job, conf.get(ACCUMULO_AGILE_USERNAME), new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD)) ); AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME)); AccumuloInputFormat.setScanAuthorizations(job, auths); JavaPairRDDKey, Value values = sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); Key.class and Value.class are from org.apache.accumulo.core.data. I use a WholeRowIterator so that the Value is actually an encoded representation of an entire logical row; it's a useful convenience if you can be sure that your rows always fit in memory. I haven't tested it since Spark 1.0.1 but I doubt anything important has changed. Regards, -Russ On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com wrote: * progress!* i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- *it will wait forever for you to add it*. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com www.AnnaiSystems.com On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or if there's some wrinke with spark-notebook in the mix that's messing everything up. I've been working with some people on stack overflow on this same issue (including one of the people from the spark-notebook team): http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530 if you click the link you can see the entire thread of code, responses from notebook, etc. I'm going to try invoking the same techniques both from within a stand-alone scala problem and from the shell itself to see if I can get some traction. I'll report back when I have more data. cheers (and thx!) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 25, 2015, at 11:43 PM, Nick Pentreath nick.pentre...@gmail.com wrote: From a quick look at this link - http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems you need to call some static methods on AccumuloInputFormat in order to set the auth, table, and range settings. Try setting these config options first and then call newAPIHadoopRDD? On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.com wrote: hi Irfan, thanks for getting back to me - i'll try the accumulo list to be sure. what is the normal use case for spark though? I'm surprised that hooking it into something as common and popular as accumulo isn't more of an every-day task. DAVID HOLIDAY Software Engineer 760 607 3300 |
Re: iPython Notebook + Spark + Accumulo -- best practice?
progress! i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- it will wait forever for you to add it. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com [cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com] www.AnnaiSystems.comhttp://www.AnnaiSystems.com On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or if there's some wrinke with spark-notebook in the mix that's messing everything up. I've been working with some people on stack overflow on this same issue (including one of the people from the spark-notebook team): http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530 if you click the link you can see the entire thread of code, responses from notebook, etc. I'm going to try invoking the same techniques both from within a stand-alone scala problem and from the shell itself to see if I can get some traction. I'll report back when I have more data. cheers (and thx!) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.comhttp://www.annaisystems.com/ On Mar 25, 2015, at 11:43 PM, Nick Pentreath nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote: From a quick look at this link - http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems you need to call some static methods on AccumuloInputFormat in order to set the auth, table, and range settings. Try setting these config options first and then call newAPIHadoopRDD? On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi Irfan, thanks for getting back to me - i'll try the accumulo list to be sure. what is the normal use case for spark though? I'm surprised that hooking it into something as common and popular as accumulo isn't more of an every-day task. DAVID HOLIDAY Software Engineer 760 607 3300tel:760%20607%203300 | Office 312 758 8385tel:312%20758%208385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.comhttp://www.annaisystems.com/ On Mar 25, 2015, at 5:27 PM, Irfan Ahmad ir...@cloudphysics.commailto:ir...@cloudphysics.com wrote: Hmmm this seems very accumulo-specific, doesn't it? Not sure how to help with that. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Tue, Mar 24, 2015 at 4:09 PM, David Holiday dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi all, got a vagrant image with spark notebook, spark, accumulo, and hadoop all running. from notebook I can manually create a scanner and pull test data from a table I created using one of the accumulo examples: val instanceNameS = accumulo val zooServersS = localhost:2181 val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS) val connector: Connector = instance.getConnector( root, new PasswordToken(password)) val auths = new Authorizations(exampleVis) val scanner = connector.createScanner(batchtest1, auths) scanner.setRange(new Range(row_00, row_10)) for(entry: Entry[Key, Value] - scanner) { println(entry.getKey + is + entry.getValue) } will give the first ten rows of table data. when I try to create the RDD thusly: val rdd2 = sparkContext.newAPIHadoopRDD ( new Configuration(), classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
Re: HQL function Rollup and Cube
Solved. In IDE, project settings was missing the dependent lib jars (jar files under spark-xx/lib). When theses jar is not set, I got class not found error about datanucleus classes (compared to an out of memory error in Spark Shell). In the context of Spark Shell, these dependent jars needs to be passed in at the spark-shell command line. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241p22246.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parallel actions from driver
You can do this much more simply, I think, with Scala's parallel collections (try .par). There's nothing wrong with doing this, no. Here, something is getting caught in your closure, maybe unintentionally, that's not serializable. It's not directly related to the parallelism. On Thu, Mar 26, 2015 at 3:54 PM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Hi. I'm trying to trigger DataFrame's save method in parallel from my driver. For that purposes I use ExecutorService and Futures, here's my code: val futures = [1,2,3].map( t = pool.submit( new Runnable { override def run(): Unit = { val commons = events.filter(_._1 == t).map(_._2.common) saveAsParquetFile(sqlContext, commons, s$t/common) EventTypes.all.foreach { et = val eventData = events.filter(ev = ev._1 == t ev._2.eventType == et).map(_._2.data) saveAsParquetFile(sqlContext, eventData, s$t/$et) } } })) futures.foreach(_.get) It throws Task is not Serializable exception. Is it legal to use threads in driver to trigger actions? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out
Hi I need help fixing a time out exception thrown from ElasticSearch Hadoop. The ES cluster is up all the time. I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection of these RDD which I traverse (with foreachRDD) and create more RDDs from each one RDD in the collection. The resulting RDDs I put in a Queue from which I create a DStream. After about 10 minutes of running, the program's debug output hangs for a bit then throws: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out This is the output: [data from elastic search like the next line (in green)] 13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista],timestamp:[1373976139000],links.entity.rank:[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}} 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close connection policy 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpConnection - Releasing connection back to connection manager. 13:55:26.631 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10) org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75]
Re: iPython Notebook + Spark + Accumulo -- best practice?
Hi, David, This is the code that I use to create a JavaPairRDD from an Accumulo table: JavaSparkContext sc = new JavaSparkContext(conf); Job hadoopJob = Job.getInstance(conf,TestSparkJob); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setZooKeeperInstance(job, conf.get(ZOOKEEPER_INSTANCE_NAME, conf.get(ZOOKEEPER_HOSTS) ); AccumuloInputFormat.setConnectorInfo(job, conf.get(ACCUMULO_AGILE_USERNAME), new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD)) ); AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME)); AccumuloInputFormat.setScanAuthorizations(job, auths); JavaPairRDDKey, Value values = sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); Key.class and Value.class are from org.apache.accumulo.core.data. I use a WholeRowIterator so that the Value is actually an encoded representation of an entire logical row; it's a useful convenience if you can be sure that your rows always fit in memory. I haven't tested it since Spark 1.0.1 but I doubt anything important has changed. Regards, -Russ On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com wrote: * progress!* i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- *it will wait forever for you to add it*. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com www.AnnaiSystems.com On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or if there's some wrinke with spark-notebook in the mix that's messing everything up. I've been working with some people on stack overflow on this same issue (including one of the people from the spark-notebook team): http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530 if you click the link you can see the entire thread of code, responses from notebook, etc. I'm going to try invoking the same techniques both from within a stand-alone scala problem and from the shell itself to see if I can get some traction. I'll report back when I have more data. cheers (and thx!) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 25, 2015, at 11:43 PM, Nick Pentreath nick.pentre...@gmail.com wrote: From a quick look at this link - http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems you need to call some static methods on AccumuloInputFormat in order to set the auth, table, and range settings. Try setting these config options first and then call newAPIHadoopRDD? On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.com wrote: hi Irfan, thanks for getting back to me - i'll try the accumulo list to be sure. what is the normal use case for spark though? I'm surprised that hooking it into something as common and popular as accumulo isn't more of an every-day task. DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 25, 2015, at 5:27 PM, Irfan Ahmad ir...@cloudphysics.com wrote: Hmmm this seems very accumulo-specific, doesn't it? Not sure how to help with that. *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On
Re: iPython Notebook + Spark + Accumulo -- best practice?
Spark uses a SerializableWritable [1] to java serialize writable objects. I've noticed (at least in Spark 1.2.1) that it breaks down with some objects when Kryo is used instead of regular java serialization. Though it is wrapping the actual AccumuloInputFormat (another example of something you may want to do in the future), we have Accumulo working to load data from a table into Spark SQL [2]. The way Spark uses the InputFormat is very straightforward. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala [2] https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76 On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.com wrote: I'm guessing the Accumulo Key and Value classes are not serializable, so you would need to do something like val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) = (extractScalaType(key), extractScalaType(value)) } Where 'extractScalaType converts the key or Value to a standard Scala type or case class or whatever - basically extracts the data from the Key or Value in a form usable in Scala — Sent from Mailbox https://www.dropbox.com/mailbox On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com wrote: Hi, David, This is the code that I use to create a JavaPairRDD from an Accumulo table: JavaSparkContext sc = new JavaSparkContext(conf); Job hadoopJob = Job.getInstance(conf,TestSparkJob); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setZooKeeperInstance(job, conf.get(ZOOKEEPER_INSTANCE_NAME, conf.get(ZOOKEEPER_HOSTS) ); AccumuloInputFormat.setConnectorInfo(job, conf.get(ACCUMULO_AGILE_USERNAME), new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD)) ); AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME)); AccumuloInputFormat.setScanAuthorizations(job, auths); JavaPairRDDKey, Value values = sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); Key.class and Value.class are from org.apache.accumulo.core.data. I use a WholeRowIterator so that the Value is actually an encoded representation of an entire logical row; it's a useful convenience if you can be sure that your rows always fit in memory. I haven't tested it since Spark 1.0.1 but I doubt anything important has changed. Regards, -Russ On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com wrote: * progress!* i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- *it will wait forever for you to add it*. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or if there's some wrinke with spark-notebook in the mix that's messing everything up. I've been working with some people on stack overflow on this same issue (including one of the people from the spark-notebook team): http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530 if you click the link you can see the entire thread of code, responses from notebook, etc. I'm going to try invoking the same techniques both from within a stand-alone scala problem and from the shell itself to see if I can get some traction. I'll report back when I have more data. cheers (and thx!) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile
Re: python : Out of memory: Kill process
Could you narrow down to a step which cause the OOM, something like: log2= self.sqlContext.jsonFile(path) log2.count() ... out.count() ... On Thu, Mar 26, 2015 at 10:34 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: the last try was without log2.cache() and still getting out of memory I using the following conf, maybe help: conf = (SparkConf() .setAppName(LoadS3) .set(spark.executor.memory, 13g) .set(spark.driver.memory, 13g) .set(spark.driver.maxResultSize,2g) .set(spark.default.parallelism,200) .set(spark.kryoserializer.buffer.mb,512)) sc = SparkContext(conf=conf ) sqlContext = SQLContext(sc) On Thu, Mar 26, 2015 at 2:29 PM, Davies Liu dav...@databricks.com wrote: Could you try to remove the line `log2.cache()` ? On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: I running on ec2 : 1 Master : 4 CPU 15 GB RAM (2 GB swap) 2 Slaves 4 CPU 15 GB RAM the uncompressed dataset size is 15 GB On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory. I ran the same code as before, I need to make any changes? On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu dav...@databricks.com wrote: With batchSize = 1, I think it will become even worse. I'd suggest to go with 1.3, have a taste for the new DataFrame API. On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I running 1.1.0. Now I'm following this thread that recommend use batchsize parameter = 1 http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html if this does not work I will install 1.2.1 or 1.3 Regards On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu dav...@databricks.com wrote: What's the version of Spark you are running? There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3, [1] https://issues.apache.org/jira/browse/SPARK-6055 On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Guys, I running the following function with spark-submmit and de SO is killing my process : def getRdd(self,date,provider): path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz' log2= self.sqlContext.jsonFile(path) log2.registerTempTable('log_test') log2.cache() You only visit the table once, cache does not help here. out=self.sqlContext.sql(SELECT user, tax from log_test where provider = '+provider+'and country '').map(lambda row: (row.user, row.tax)) print out1 return map((lambda (x,y): (x, list(y))), sorted(out.groupByKey(2000).collect())) 100 partitions (or less) will be enough for 2G dataset. The input dataset has 57 zip files (2 GB) The same process with a smaller dataset completed successfully Any ideas to debug is welcome. Regards Eduardo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD to DataFrame for using ALS under org.apache.spark.ml.recommendation.ALS
After this line: val sc = new SparkContext(conf) You need to add this line: import sc.implicits._ //this is used to implicitly convert an RDD to a DataFrame. Hope this helps -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083p22247.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FakeClassTag in Java API
The JavaAPI uses FakeClassTag for all of the implicit class tags fed to RDDs during creation, mapping, etc. I am working on a more generic Scala library where I won't always have the type information beforehand. Is it safe / accepted practice to use FakeClassTag in these situations as well? It was my understanding that Scala requires the ClassTag in order to build new Arrays, what happens when they are not present? Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FakeClassTag-in-Java-API-tp22253.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark History Server : jobs link doesn't open
in log I found this 2015-03-26 19:42:09,531 WARN org.eclipse.jetty.servlet.ServletHandler: Error for /history/application_1425934191900_87572 org.spark-project.guava.common.util.concurrent.ExecutionError: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.spark-project.guava.common.cache.LocalCache$Segment.get(LocalCache.java:2261) at org.spark-project.guava.common.cache.LocalCache.get(LocalCache.java:4000) thanks On Thu, Mar 26, 2015 at 7:27 PM, , Roy rp...@njit.edu wrote: We have Spark on YARN, with Cloudera Manager 5.3.2 and CDH 5.3.2 Jobs link on spark History server doesn't open and shows following message : HTTP ERROR: 500 Problem accessing /history/application_1425934191900_87572. Reason: Server Error -- *Powered by Jetty://*
Re: Why k-means cluster hang for a long time?
Hi Burak, After I added .repartition(sc.defaultParallelism), I can see from the log the partition number is set to 32. But in the Spark UI, it seems all the data are loaded onto one executor. Previously they were loaded onto 4 executors. Any idea? Thanks, David On Fri, Mar 27, 2015 at 11:01 AM Xi Shen davidshe...@gmail.com wrote: How do I get the number of cores that I specified at the command line? I want to use spark.default.parallelism. I have 4 executors, each has 8 cores. According to https://spark.apache.org/docs/1.2.0/configuration.html#execution-behavior, the spark.default.parallelism value will be 4 * 8 = 32...I think it is too large, or inappropriate. Please give some suggestion. I have already used cache, and count to pre-cache. I can try with smaller k for testing, but eventually I will have to use k = 5000 or even large. Because I estimate our data set would have that much of clusters. Thanks, David On Fri, Mar 27, 2015 at 10:40 AM Burak Yavuz brk...@gmail.com wrote: Hi David, The number of centroids (k=5000) seems too large and is probably the cause of the code taking too long. Can you please try the following: 1) Repartition data to the number of available cores with .repartition(numCores) 2) cache data 3) call .count() on data right before k-means 4) try k=500 (even less if possible) Thanks, Burak On Mar 26, 2015 4:15 PM, Xi Shen davidshe...@gmail.com wrote: The code is very simple. val data = sc.textFile(very/large/text/file) map { l = // turn each line into dense vector Vectors.dense(...) } // the resulting data set is about 40k vectors KMeans.train(data, k=5000, maxIterations=500) I just kill my application. In the log I found this: 15/03/26 11:42:43 INFO storage.BlockManagerMaster: Updated info of block broadcast_26_piece0 15/03/26 23:02:57 WARN server.TransportChannelHandler: Exception in connection from workernode0.xshe3539-hadoop-sydney.q10.internal.cloudapp. net/100.72.84.107:56277 java.io.IOException: An existing connection was forcibly closed by the remote host Notice the time gap. I think it means the work node did not generate any log at all for about 12hrs...does it mean they are not working at all? But when testing with very small data set, my application works and output expected data. Thanks, David On Fri, Mar 27, 2015 at 10:04 AM Burak Yavuz brk...@gmail.com wrote: Can you share the code snippet of how you call k-means? Do you cache the data before k-means? Did you repartition the data? On Mar 26, 2015 4:02 PM, Xi Shen davidshe...@gmail.com wrote: OH, the job I talked about has ran more than 11 hrs without a result...it doesn't make sense. On Fri, Mar 27, 2015 at 9:48 AM Xi Shen davidshe...@gmail.com wrote: Hi Burak, My iterations is set to 500. But I think it should also stop of the centroid coverages, right? My spark is 1.2.0, working in windows 64 bit. My data set is about 40k vectors, each vector has about 300 features, all normalised. All work node have sufficient memory and disk space. Thanks, David On Fri, 27 Mar 2015 02:48 Burak Yavuz brk...@gmail.com wrote: Hi David, When the number of runs are large and the data is not properly partitioned, it seems that K-Means is hanging according to my experience. Especially setting the number of runs to something high drastically increases the work in executors. If that's not the case, can you give more info on what Spark version you are using, your setup, and your dataset? Thanks, Burak On Mar 26, 2015 5:10 AM, Xi Shen davidshe...@gmail.com wrote: Hi, When I run k-means cluster with Spark, I got this in the last two lines in the log: 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned broadcast 26 15/03/26 11:42:42 INFO spark.ContextCleaner: Cleaned shuffle 5 Then it hangs for a long time. There's no active job. The driver machine is idle. I cannot access the work node, I am not sure if they are busy. I understand k-means may take a long time to finish. But why no active job? no log? Thanks, David
Re: WordCount example
What's the best way to troubleshoot inside spark to see why Spark is not connecting to nc on port ? I don't see any errors either. On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run the word count example but for some reason it's not working as expected. I start nc server on port and then submit the spark job to the cluster. Spark job gets successfully submitting but I never see any connection from spark getting established. I also tried to type words on the console where nc is listening and waiting on the prompt, however I don't see any output. I also don't see any errors. Here is the conf: SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, );
Re: python : Out of memory: Kill process
Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory. I ran the same code as before, I need to make any changes? On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu dav...@databricks.com wrote: With batchSize = 1, I think it will become even worse. I'd suggest to go with 1.3, have a taste for the new DataFrame API. On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Davies, I running 1.1.0. Now I'm following this thread that recommend use batchsize parameter = 1 http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html if this does not work I will install 1.2.1 or 1.3 Regards On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu dav...@databricks.com wrote: What's the version of Spark you are running? There is a bug in SQL Python API [1], it's fixed in 1.2.1 and 1.3, [1] https://issues.apache.org/jira/browse/SPARK-6055 On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa eduardo.c...@usmediaconsulting.com wrote: Hi Guys, I running the following function with spark-submmit and de SO is killing my process : def getRdd(self,date,provider): path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz' log2= self.sqlContext.jsonFile(path) log2.registerTempTable('log_test') log2.cache() You only visit the table once, cache does not help here. out=self.sqlContext.sql(SELECT user, tax from log_test where provider = '+provider+'and country '').map(lambda row: (row.user, row.tax)) print out1 return map((lambda (x,y): (x, list(y))), sorted(out.groupByKey(2000).collect())) 100 partitions (or less) will be enough for 2G dataset. The input dataset has 57 zip files (2 GB) The same process with a smaller dataset completed successfully Any ideas to debug is welcome. Regards Eduardo
RDD equivalent of HBase Scan
HBase scans come with the ability to specify filters that make scans very fast and efficient (as they let you seek for the keys that pass the filter). Do RDD's or Spark DataFrames offer anything similar or would I be required to use a NoSQL db like HBase to do something like this? -- Stuart Layton
Re: RDD equivalent of HBase Scan
In examples//src/main/scala/org/apache/spark/examples/HBaseTest.scala, TableInputFormat is used. TableInputFormat accepts parameter public static final String SCAN = hbase.mapreduce.scan; where if specified, Scan object would be created from String form: if (conf.get(SCAN) != null) { try { scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); You can use TableMapReduceUtil#convertScanToString() to convert a Scan which has filter(s) and pass to TableInputFormat Cheers On Thu, Mar 26, 2015 at 6:46 AM, Stuart Layton stuart.lay...@gmail.com wrote: HBase scans come with the ability to specify filters that make scans very fast and efficient (as they let you seek for the keys that pass the filter). Do RDD's or Spark DataFrames offer anything similar or would I be required to use a NoSQL db like HBase to do something like this? -- Stuart Layton
Spark log shows only this line repeated: RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time X
Here's my log output from a streaming job. What is this? 09:54:27.504 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067504 09:54:27.505 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067505 09:54:27.506 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067506 09:54:27.508 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067507 09:54:27.508 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067508 09:54:27.509 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067509 09:54:27.510 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067510 09:54:27.511 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067511 09:54:27.512 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067512 09:54:27.513 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067513 09:54:27.514 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067514 09:54:27.515 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067515 09:54:27.516 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067516 09:54:27.517 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067517 09:54:27.518 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067518 09:54:27.519 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067519 09:54:27.520 [Recurri ...
Re: RDD equivalent of HBase Scan
Thanks but I'm hoping to get away from hbase all together. I was wondering if there is a way to get similar scan performance directly on cached rdd's or data frames On Thu, Mar 26, 2015 at 9:54 AM, Ted Yu yuzhih...@gmail.com wrote: In examples//src/main/scala/org/apache/spark/examples/HBaseTest.scala, TableInputFormat is used. TableInputFormat accepts parameter public static final String SCAN = hbase.mapreduce.scan; where if specified, Scan object would be created from String form: if (conf.get(SCAN) != null) { try { scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); You can use TableMapReduceUtil#convertScanToString() to convert a Scan which has filter(s) and pass to TableInputFormat Cheers On Thu, Mar 26, 2015 at 6:46 AM, Stuart Layton stuart.lay...@gmail.com wrote: HBase scans come with the ability to specify filters that make scans very fast and efficient (as they let you seek for the keys that pass the filter). Do RDD's or Spark DataFrames offer anything similar or would I be required to use a NoSQL db like HBase to do something like this? -- Stuart Layton -- Stuart Layton
Re: Which RDD operations preserve ordering?
This is related: https://issues.apache.org/jira/browse/SPARK-6340 On Thu, Mar 26, 2015 at 5:58 AM, sergunok ser...@gmail.com wrote: Hi guys, I don't have exact picture about preserving of ordering of elements of RDD after executing of operations. Which operations preserve it? 1) Map (Yes?) 2) ZipWithIndex (Yes or sometimes yes?) Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-RDD-operations-preserve-ordering-tp22239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD.map does not allowed to preservesPartitioning?
Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iPython Notebook + Spark + Accumulo -- best practice?
That purely awesome! Don't hesitate to contribute your notebook back to the spark notebook repo, even rough, I'll help cleaning up if needed. The vagrant is also appealing Congrats! Le jeu 26 mars 2015 22:22, David Holiday dav...@annaisystems.com a écrit : w0t! that did it! t/y so much! I'm going to put together a pastebin or something that has all the code put together so if anyone else runs into this issue they will have some working code to help them figure out what's going on. DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com www.AnnaiSystems.com On Mar 26, 2015, at 12:24 PM, Corey Nolet cjno...@gmail.com wrote: Spark uses a SerializableWritable [1] to java serialize writable objects. I've noticed (at least in Spark 1.2.1) that it breaks down with some objects when Kryo is used instead of regular java serialization. Though it is wrapping the actual AccumuloInputFormat (another example of something you may want to do in the future), we have Accumulo working to load data from a table into Spark SQL [2]. The way Spark uses the InputFormat is very straightforward. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala [2] https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76 On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.com wrote: I'm guessing the Accumulo Key and Value classes are not serializable, so you would need to do something like val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) = (extractScalaType(key), extractScalaType(value)) } Where 'extractScalaType converts the key or Value to a standard Scala type or case class or whatever - basically extracts the data from the Key or Value in a form usable in Scala — Sent from Mailbox https://www.dropbox.com/mailbox On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com wrote: Hi, David, This is the code that I use to create a JavaPairRDD from an Accumulo table: JavaSparkContext sc = new JavaSparkContext(conf); Job hadoopJob = Job.getInstance(conf,TestSparkJob); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setZooKeeperInstance(job, conf.get(ZOOKEEPER_INSTANCE_NAME, conf.get(ZOOKEEPER_HOSTS) ); AccumuloInputFormat.setConnectorInfo(job, conf.get(ACCUMULO_AGILE_USERNAME), new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD)) ); AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME)); AccumuloInputFormat.setScanAuthorizations(job, auths); JavaPairRDDKey, Value values = sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); Key.class and Value.class are from org.apache.accumulo.core.data. I use a WholeRowIterator so that the Value is actually an encoded representation of an entire logical row; it's a useful convenience if you can be sure that your rows always fit in memory. I haven't tested it since Spark 1.0.1 but I doubt anything important has changed. Regards, -Russ On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com wrote: * progress!* i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- *it will wait forever for you to add it*. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case
RE: EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out
I also get stack overflow every now and then without having any recursive calls: java.lang.StackOverflowError: null at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1479) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_75] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_75] at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) ~[scala-library.jar:na] at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) ~[na:na] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75] this is a huge stack trace... but it keeps repeating What could this be from? From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March 26, 2015 2:10 PM To: u...@spark.incubator.apache.org; user@spark.apache.org Subject: EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out Hi I need help fixing a time out exception thrown from ElasticSearch Hadoop. The ES cluster is up all the time. I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection of these RDD which I traverse (with foreachRDD) and create more RDDs from each one RDD in the collection. The resulting RDDs I put in a Queue from which I create a DStream. After about 10 minutes of running, the program's debug output hangs for a bit then throws: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out This is the output: [data from elastic search like the next line (in green)] 13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista],timestamp:[1373976139000],links.entity.rank:[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}} 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close connection policy 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpConnection - Releasing connection back to connection manager. 13:55:26.631 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10) org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at
Re: RDD.map does not allowed to preservesPartitioning?
Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don’t know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don’t know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.commailto:jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Combining Many RDDs
Hi, I used union() before and yes it may be slow sometimes. I _guess_ your variable 'data' is a Scala collection and compute() returns an RDD. Right? If yes, I tried the approach below to operate on one RDD only during the whole computation (Yes, I also saw that too many RDD hurt performance). Change compute() to return Scala collection instead of RDD. val result = sc.parallelize(data)// Create and partition the 0.5M items in a single RDD. .flatMap(compute(_)) // You still have only one RDD with each item joined with external data already Hope this help. Kelvin On Thu, Mar 26, 2015 at 2:37 PM, Yang Chen y...@yang-cs.com wrote: Hi Mark, That's true, but in neither way can I combine the RDDs, so I have to avoid unions. Thanks, Yang On Thu, Mar 26, 2015 at 5:31 PM, Mark Hamstra m...@clearstorydata.com wrote: RDD#union is not the same thing as SparkContext#union On Thu, Mar 26, 2015 at 2:27 PM, Yang Chen y...@yang-cs.com wrote: Hi Noorul, Thank you for your suggestion. I tried that, but ran out of memory. I did some search and found some suggestions that we should try to avoid rdd.union( http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark ). I will try to come up with some other ways. Thank you, Yang On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M noo...@noorul.com wrote: sparkx y...@yang-cs.com writes: Hi, I have a Spark job and a dataset of 0.5 Million items. Each item performs some sort of computation (joining a shared external dataset, if that does matter) and produces an RDD containing 20-500 result items. Now I would like to combine all these RDDs and perform a next job. What I have found out is that the computation itself is quite fast, but combining these RDDs takes much longer time. val result = data// 0.5M data items .map(compute(_)) // Produces an RDD - fast .reduce(_ ++ _) // Combining RDDs - slow I have also tried to collect results from compute(_) and use a flatMap, but that is also slow. Is there a way to efficiently do this? I'm thinking about writing this result to HDFS and reading from disk for the next job, but am not sure if that's a preferred way in Spark. Are you looking for SparkContext.union() [1] ? This is not performing well with spark cassandra connector. I am not sure whether this will help you. Thanks and Regards Noorul [1] http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext -- Yang Chen Dept. of CISE, University of Florida Mail: y...@yang-cs.com Web: www.cise.ufl.edu/~yang -- Yang Chen Dept. of CISE, University of Florida Mail: y...@yang-cs.com Web: www.cise.ufl.edu/~yang
Can't access file in spark, but can in hadoop
There seems to be a special kind of corrupted according to Spark state of file in HDFS. I have isolated a set of files (maybe 1% of all files I need to work with) which are producing the following stack dump when I try to sc.textFile() open them. When I try to open directories, most large directories contain at least one file of this type. Curiously, the following two lines fail inside of a Spark job, but not inside of a Scoobi job: val conf = new org.apache.hadoop.conf.Configuration val fs = org.apache.hadoop.fs.FileSystem.get(conf) The stack trace follows: 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: null) Exception in thread Driver java.lang.IllegalStateException at org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743) at org.apache.hadoop.hdfs.DistributedFileSystem$15.init(DistributedFileSystem.java:738) at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727) at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662) at org.apache.hadoop.fs.FileSystem$5.init(FileSystem.java:1724) at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook It appears to have found the three copies of the given HDFS block, but is performing some sort of validation with them before giving them back to spark to schedule the job. But there is an assert failing. I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same error, but I've seen the line numbers change on the HDFS libraries, but not the function names. I've tried recompiling myself with different hadoop versions, and it's the same. We're running hadoop 2.4.1 on our cluster. A google search turns up absolutely nothing on this. Any insight at all would be appreciated. Dale Johnson Applied Researcher eBay.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
This is just a deficiency of the api, imo. I agree: mapValues could definitely be a function (K, V)=V1. The option isn't set by the function, it's on the RDD. So you could look at the code and do this. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala def mapValues[U](f: V = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) }, preservesPartitioning = true) } One of the nice things about spark is that making such new operators is very easy :) 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don’t know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don’t know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD.map does not allowed to preservesPartitioning?
I think we have a version of mapPartitions that allows you to tell Spark the partitioning is preserved: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639 We could also add a map function that does same. Or you can just write your map using an iterator. - Patrick On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney jcove...@gmail.com wrote: This is just a deficiency of the api, imo. I agree: mapValues could definitely be a function (K, V)=V1. The option isn't set by the function, it's on the RDD. So you could look at the code and do this. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala def mapValues[U](f: V = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) }, preservesPartitioning = true) } One of the nice things about spark is that making such new operators is very easy :) 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don't know the use cases. But for example, if I also want to operate both the key and value part to generate some new value with keeping key part untouched. Then mapValues may not be able to do this. Changing the code to allow this is trivial, but I don't know whether there is some special reason behind this. Thanks. Zhan Zhang On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote: I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at reduceByKey at console:23 [] +-(8) MapPartitionsRDD[31] at map at console:23 [] | ParallelCollectionRDD[30] at parallelize at console:23 [] The difference is that spark has no way to know that your map closure doesn't change the key. if you only use mapValues, it does. Pretty cool that they optimized that :) 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com: Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) val r2 = r1.map((_, 1)) val r3 = r2.reduceByKey(_+_) val r4 = r3.map(x=(x._1, x._2 + 1)) val r5 = r4.reduceByKey(_+_) r5.collect.foreach(println) scala r5.toDebugString res2: String = (8) ShuffledRDD[4] at reduceByKey at console:29 [] +-(8) MapPartitionsRDD[3] at map at console:27 [] | ShuffledRDD[2] at reduceByKey at console:25 [] +-(8) MapPartitionsRDD[1] at map at console:23 [] | ParallelCollectionRDD[0] at parallelize at console:21 [] Thanks. Zhan Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error in creating log directory
I get the following error message when I start pyspark shell. The config has the following settings- # spark.masterspark://master:7077 # spark.eventLog.enabled true # spark.eventLog.dir hdfs://namenode:8021/directory # spark.serializerorg.apache.spark.serializer.KryoSerializer spark.eventLog.dir=/user/spark/applicationHistory spark.eventLog.enabled=true spark.yarn.historyServer.address=name101-car.ldcint.com:10020 [pzilaro@name101-car conf]$ pyspark Python 2.6.6 (r266:84292, Jan 22 2014, 09:42:36) [GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2 Type help, copyright, credits or license for more information. 15/03/26 13:46:06 INFO spark.SecurityManager: Changing view acls to: pzilaro 15/03/26 13:46:06 INFO spark.SecurityManager: Changing modify acls to: pzilaro 15/03/26 13:46:06 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pzilaro); users with modify permissions: Set(pzilaro) 15/03/26 13:46:07 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/03/26 13:46:07 INFO Remoting: Starting remoting 15/03/26 13:46:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@name101-car.ldcint.com:48040] 15/03/26 13:46:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@name101-car.ldcint.com:48040] 15/03/26 13:46:07 INFO util.Utils: Successfully started service 'sparkDriver' on port 48040. 15/03/26 13:46:07 INFO spark.SparkEnv: Registering MapOutputTracker 15/03/26 13:46:07 INFO spark.SparkEnv: Registering BlockManagerMaster 15/03/26 13:46:07 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150326134607-072e 15/03/26 13:46:07 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB 15/03/26 13:46:08 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-2f342a3a-c5bb-474d-867b-8bd5b9f9d1ac 15/03/26 13:46:08 INFO spark.HttpServer: Starting HTTP Server 15/03/26 13:46:08 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/26 13:46:08 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55296 15/03/26 13:46:08 INFO util.Utils: Successfully started service 'HTTP file server' on port 55296. 15/03/26 13:46:08 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/26 13:46:08 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/26 13:46:08 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/03/26 13:46:08 INFO ui.SparkUI: Started SparkUI at http://name101-car.ldcint.com:4040 15/03/26 13:46:08 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkdri...@name101-car.ldcint.com:48040/user/HeartbeatReceiver 15/03/26 13:46:08 INFO netty.NettyBlockTransferService: Server created on 55241 15/03/26 13:46:08 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/03/26 13:46:08 INFO storage.BlockManagerMasterActor: Registering block manager localhost:55241 with 265.4 MB RAM, BlockManagerId(driver, localhost, 55241) 15/03/26 13:46:08 INFO storage.BlockManagerMaster: Registered BlockManager Traceback (most recent call last): File /usr/lib/spark/python/pyspark/shell.py, line 45, in module sc = SparkContext(appName=PySparkShell, pyFiles=add_files) File /usr/lib/spark/python/pyspark/context.py, line 105, in __init__ conf, jsc) File /usr/lib/spark/python/pyspark/context.py, line 153, in _do_init self._jsc = jsc or self._initialize_context(self._conf._jconf) File /usr/lib/spark/python/pyspark/context.py, line 201, in _initialize_context return self._jvm.JavaSparkContext(jconf) File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 701, in __call__ File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.io.IOException: Error in creating log directory: file:/user/spark/applicationHistory//local-1427402768636 at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:133) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
Re: Spark SQL queries hang forever
Is it possible to jstack the executors and see where they are hanging? On Thu, Mar 26, 2015 at 2:02 PM, Jon Chase jon.ch...@gmail.com wrote: Spark 1.3.0 on YARN (Amazon EMR), cluster of 10 m3.2xlarge (8cpu, 30GB), executor memory 20GB, driver memory 10GB I'm using Spark SQL, mainly via spark-shell, to query 15GB of data spread out over roughly 2,000 Parquet files and my queries frequently hang. Simple queries like select count(*) from ... on the entire data set work ok. Slightly more demanding ones with group by's and some aggregate functions (percentile_approx, avg, etc.) work ok as well, as long as I have some criteria in my where clause to keep the number of rows down. Once I hit some limit on query complexity and rows processed, my queries start to hang. I've left them for up to an hour without seeing any progress. No OOM's either - the job is just stuck. I've tried setting spark.sql.shuffle.partitions to 400 and even 800, but with the same results: usually near the end of the tasks (like 780 of 800 complete), progress just stops: 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 788.0 in stage 1.0 (TID 1618) in 800 ms on ip-10-209-22-211.eu-west-1.compute.internal (748/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 793.0 in stage 1.0 (TID 1623) in 622 ms on ip-10-105-12-41.eu-west-1.compute.internal (749/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 797.0 in stage 1.0 (TID 1627) in 616 ms on ip-10-90-2-201.eu-west-1.compute.internal (750/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 799.0 in stage 1.0 (TID 1629) in 611 ms on ip-10-90-2-201.eu-west-1.compute.internal (751/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 795.0 in stage 1.0 (TID 1625) in 669 ms on ip-10-105-12-41.eu-west-1.compute.internal (752/800) ^^^ this is where it stays forever Looking at the Spark UI, several of the executors still list active tasks. I do see that the Shuffle Read for executors that don't have any tasks remaining is around 100MB, whereas it's more like 10MB for the executors that still have tasks. The first stage, mapPartitions, always completes fine. It's the second stage (takeOrdered), that hangs. I've had this issue in 1.2.0 and 1.2.1 as well as 1.3.0. I've also encountered it when using JSON files (instead of Parquet). Thoughts? I'm blocked on using Spark SQL b/c most of the queries I do are having this issue.
Re: iPython Notebook + Spark + Accumulo -- best practice?
w0t! that did it! t/y so much! I'm going to put together a pastebin or something that has all the code put together so if anyone else runs into this issue they will have some working code to help them figure out what's going on. DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com [cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com] www.AnnaiSystems.comhttp://www.AnnaiSystems.com On Mar 26, 2015, at 12:24 PM, Corey Nolet cjno...@gmail.commailto:cjno...@gmail.com wrote: Spark uses a SerializableWritable [1] to java serialize writable objects. I've noticed (at least in Spark 1.2.1) that it breaks down with some objects when Kryo is used instead of regular java serialization. Though it is wrapping the actual AccumuloInputFormat (another example of something you may want to do in the future), we have Accumulo working to load data from a table into Spark SQL [2]. The way Spark uses the InputFormat is very straightforward. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala [2] https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76 On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote: I'm guessing the Accumulo Key and Value classes are not serializable, so you would need to do something like val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) = (extractScalaType(key), extractScalaType(value)) } Where 'extractScalaType converts the key or Value to a standard Scala type or case class or whatever - basically extracts the data from the Key or Value in a form usable in Scala — Sent from Mailboxhttps://www.dropbox.com/mailbox On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.commailto:rwe...@newbrightidea.com wrote: Hi, David, This is the code that I use to create a JavaPairRDD from an Accumulo table: JavaSparkContext sc = new JavaSparkContext(conf); Job hadoopJob = Job.getInstance(conf,TestSparkJob); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setZooKeeperInstance(job, conf.get(ZOOKEEPER_INSTANCE_NAME, conf.get(ZOOKEEPER_HOSTS) ); AccumuloInputFormat.setConnectorInfo(job, conf.get(ACCUMULO_AGILE_USERNAME), new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD)) ); AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME)); AccumuloInputFormat.setScanAuthorizations(job, auths); JavaPairRDDKey, Value values = sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); Key.class and Value.class are from org.apache.accumulo.core.data. I use a WholeRowIterator so that the Value is actually an encoded representation of an entire logical row; it's a useful convenience if you can be sure that your rows always fit in memory. I haven't tested it since Spark 1.0.1 but I doubt anything important has changed. Regards, -Russ On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.commailto:dav...@annaisystems.com wrote: progress! i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- it will wait forever for you to add it. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300tel:760%20607%203300 | Office 312 758 8385tel:312%20758%208385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.comhttp://www.annaisystems.com/ On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or if there's some wrinke with spark-notebook in the mix
WordCount example
I am trying to run the word count example but for some reason it's not working as expected. I start nc server on port and then submit the spark job to the cluster. Spark job gets successfully submitting but I never see any connection from spark getting established. I also tried to type words on the console where nc is listening and waiting on the prompt, however I don't see any output. I also don't see any errors. Here is the conf: SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, );
Spark SQL queries hang forever
Spark 1.3.0 on YARN (Amazon EMR), cluster of 10 m3.2xlarge (8cpu, 30GB), executor memory 20GB, driver memory 10GB I'm using Spark SQL, mainly via spark-shell, to query 15GB of data spread out over roughly 2,000 Parquet files and my queries frequently hang. Simple queries like select count(*) from ... on the entire data set work ok. Slightly more demanding ones with group by's and some aggregate functions (percentile_approx, avg, etc.) work ok as well, as long as I have some criteria in my where clause to keep the number of rows down. Once I hit some limit on query complexity and rows processed, my queries start to hang. I've left them for up to an hour without seeing any progress. No OOM's either - the job is just stuck. I've tried setting spark.sql.shuffle.partitions to 400 and even 800, but with the same results: usually near the end of the tasks (like 780 of 800 complete), progress just stops: 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 788.0 in stage 1.0 (TID 1618) in 800 ms on ip-10-209-22-211.eu-west-1.compute.internal (748/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 793.0 in stage 1.0 (TID 1623) in 622 ms on ip-10-105-12-41.eu-west-1.compute.internal (749/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 797.0 in stage 1.0 (TID 1627) in 616 ms on ip-10-90-2-201.eu-west-1.compute.internal (750/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 799.0 in stage 1.0 (TID 1629) in 611 ms on ip-10-90-2-201.eu-west-1.compute.internal (751/800) 15/03/26 20:53:29 INFO scheduler.TaskSetManager: Finished task 795.0 in stage 1.0 (TID 1625) in 669 ms on ip-10-105-12-41.eu-west-1.compute.internal (752/800) ^^^ this is where it stays forever Looking at the Spark UI, several of the executors still list active tasks. I do see that the Shuffle Read for executors that don't have any tasks remaining is around 100MB, whereas it's more like 10MB for the executors that still have tasks. The first stage, mapPartitions, always completes fine. It's the second stage (takeOrdered), that hangs. I've had this issue in 1.2.0 and 1.2.1 as well as 1.3.0. I've also encountered it when using JSON files (instead of Parquet). Thoughts? I'm blocked on using Spark SQL b/c most of the queries I do are having this issue.