Re: Is SPARK-3322 fixed in latest version of Spark?
ConnectionManager has been deprecated and is no longer used by default (NettyBlockTransferService is the replacement). Hopefully you would no longer see these messages unless you have explicitly flipped it back on. On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote: And also https://issues.apache.org/jira/browse/SPARK-3106 This one is still open. On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote: *Symotom:* Even sample job fails: $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10 Pi is roughly 3.140636 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(xxx,) not found WARN ConnectionManager: All connections not cleaned up Found https://issues.apache.org/jira/browse/SPARK-3322 But the code changes are not in newer version os Spark, however this jira is marked as fixed. Is this issue really fixed in latest version? If so, what is the related JIRA? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Total delay per batch in a CSV file
Hi, Lots of streaming internal status are exposed through StreamingListener, as well as what see from web UI, so you could write your own StreamingListener and register in StreamingContext to get the internal information of Spark Streaming and write to CSV file. You could check the source code here ( https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala ). Thanks Saisai On Tue, Aug 4, 2015 at 6:58 PM, allonsy luke1...@gmail.com wrote: Hi everyone, I'm working with Spark Streaming, and I need to perform some offline performance measures. What I'd like to have is a CSV file that reports something like this: *Batch number/timestampInput SizeTotal Delay* which is in fact similar to what the UI outputs. I tried to get some metrics (metrics.properties), but I'm having hard time getting precise information on every single batch, since they only have entries concerning the /last/ (completed/received) batch, and values are often different to those appearing in the UI. Can anybody give me some advice on how to get metrics that are close to those of the UI? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Total-delay-per-batch-in-a-CSV-file-tp24129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Combining Spark Files with saveAsTextFile
using coalesce might be dangerous, since 1 worker process will need to handle whole file and if the file is huge you'll get OOM, however it depends on implementation, I'm not sure how it will be done nevertheless, worse to try the coallesce method(please post your results) another option would be to use FileUtil.copyMerge which copies each partition one after another into destination stream(file); so as soon as you've written your hdfs file with spark with multiple partitions in parallel(as usual), you can then make another step to merge it into any destination you want On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote: Just to further clarify, you can first call coalesce with argument 1 and then call saveAsTextFile. For example, rdd.coalesce(1).saveAsTextFile(...) Mohammed *From:* Mohammed Guller *Sent:* Tuesday, August 4, 2015 9:39 PM *To:* 'Brandon White'; user *Subject:* RE: Combining Spark Files with saveAsTextFile One options is to use the coalesce method in the RDD class. Mohammed *From:* Brandon White [mailto:bwwintheho...@gmail.com bwwintheho...@gmail.com] *Sent:* Tuesday, August 4, 2015 7:23 PM *To:* user *Subject:* Combining Spark Files with saveAsTextFile What is the best way to make saveAsTextFile save as only a single file?
Debugging Spark job in Eclipse
Hi, As spark job is executed when you run start() method of JavaStreamingContext. All the job like map, flatMap is already defined earlier but even though you put breakpoints in the function ,breakpoint doesn't stop there , then how can i debug the spark jobs. JavaDStreamString words=lines.flatMap(new FlatMapFunctionString, String() { private static final long serialVersionUID = -2042174881679341118L; @Override public IterableString call(String t) throws Exception { *// Mark Debug Point here, it doesn't stop here.* return Lists.newArrayList(SPACE.split(t)); } }); Please suggest how can i saw the in-between data values. Regards, Deepesh
Implementing algorithms in GraphX pregel
Hi, I was recently looking into spark graphx as one of the frameworks that can help me solve some graph related problems. The 'think-like-a-vertex' paradigm is something new to me and I cannot wrap my head over how to implement simple algorithms like Depth First or Breadth First or even getting a list of all simple paths between 2 vertices in graphx. 1. Any clue on how to approach the above problems using graphx pregel. 2. Is there any documentation as to how to send different/unique initial message to all vertices? 3. How do I send the initial message to only a subset of the vertices? 4. How does 'voting-to-stop' work? Thanks in advance for any pointers and explanations. -- κρισhναν
RE: Spark SQL support for Hive 0.14
Thanks Steve and Michael for your response. Is there a tentative release date for Spark 1.5? From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Tuesday, August 4, 2015 11:53 PM To: Steve Loughran ste...@hortonworks.com Cc: Ishwardeep Singh ishwardeep.si...@impetus.co.in; user@spark.apache.org Subject: Re: Spark SQL support for Hive 0.14 I'll add that while Spark SQL 1.5 compiles against Hive 1.2.1, it has support for reading from metastores for Hive 0.12 - 1.2.1 On Tue, Aug 4, 2015 at 9:59 AM, Steve Loughran ste...@hortonworks.commailto:ste...@hortonworks.com wrote: Spark 1.3.1 1.4 only support Hive 0.13 Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14 support entirely and go straight to the currently supported Hive release. See SPARK-8064 for the gory details On 3 Aug 2015, at 23:01, Ishwardeep Singh ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote: Hi, Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13. Is there a way to compile spark with Hive 0.14? Currently we are using Spark 1.3.1. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Fwd: MLLIB MulticlassMetrics Unable to find class key
Hi everyone, After a successfull prediction, MulticlassMetrics returns an error for unable to find the key for the class for precision. Is there a way to check whether the metrics contains the class key? -- Hayri Volkan Agun PhD. Student - Anadolu University
Re: Combining Spark Files with saveAsTextFile
seems that coallesce do work, see following thread https://www.mail-archive.com/user%40spark.apache.org/msg00928.html On 5 August 2015 at 09:47, Igor Berman igor.ber...@gmail.com wrote: using coalesce might be dangerous, since 1 worker process will need to handle whole file and if the file is huge you'll get OOM, however it depends on implementation, I'm not sure how it will be done nevertheless, worse to try the coallesce method(please post your results) another option would be to use FileUtil.copyMerge which copies each partition one after another into destination stream(file); so as soon as you've written your hdfs file with spark with multiple partitions in parallel(as usual), you can then make another step to merge it into any destination you want On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote: Just to further clarify, you can first call coalesce with argument 1 and then call saveAsTextFile. For example, rdd.coalesce(1).saveAsTextFile(...) Mohammed *From:* Mohammed Guller *Sent:* Tuesday, August 4, 2015 9:39 PM *To:* 'Brandon White'; user *Subject:* RE: Combining Spark Files with saveAsTextFile One options is to use the coalesce method in the RDD class. Mohammed *From:* Brandon White [mailto:bwwintheho...@gmail.com bwwintheho...@gmail.com] *Sent:* Tuesday, August 4, 2015 7:23 PM *To:* user *Subject:* Combining Spark Files with saveAsTextFile What is the best way to make saveAsTextFile save as only a single file?
Spark Streaming - CheckPointing issue
Hi i've done the twitter streaming using twitter's streaming user api and spark streaming. this runs successfully on my local machine. but when i run this program on cluster in local mode. it just run successfully for the very first time. later on it gives the following exception. Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. and spark class path is unset already!! I have to make a new checkpoint directory each time to make it run successfully. otherwise it shows above exception. Can anyone help me to resolve this issue? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-CheckPointing-issue-tp24139.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re?? About memory leak in spark 1.4.1
No one help me... I help myself, I split the cluster to two cluster 1.4.1 and 1.3.0 -- -- ??: Ted Yu;yuzhih...@gmail.com; : 2015??8??4??(??) 10:28 ??: Igor Bermanigor.ber...@gmail.com; : Sea261810...@qq.com; Barak Gitsisbar...@similarweb.com; user@spark.apache.orguser@spark.apache.org; rxinr...@databricks.com; joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; : Re: About memory leak in spark 1.4.1 w.r.t. spark.deploy.spreadOut , here is the scaladoc: // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. private val spreadOutApps = conf.getBoolean(spark.deploy.spreadOut, true) Cheers On Tue, Aug 4, 2015 at 4:13 AM, Igor Berman igor.ber...@gmail.com wrote: sorry, can't disclose info about my prod cluster nothing jumps into my mind regarding your config we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there is no documentation regarding this) If you are sure that you don't have memory leak in your business logic I would try to reset each property to default(or just remove it from your config) and try to run your job to see if it's not somehow connected my config(nothing special really) spark.shuffle.consolidateFiles true spark.speculation false spark.executor.extraJavaOptions -XX:+UseStringCache -XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc spark.executor.logs.rolling.maxRetainedFiles 1000 spark.executor.logs.rolling.strategy time spark.worker.cleanup.enabled true spark.logConf true spark.rdd.compress true On 4 August 2015 at 12:59, Sea 261810...@qq.com wrote: How much machines are there in your standalone cluster? I am not using tachyon. GC can not help me... Can anyone help ? my configuration: spark.deploy.spreadOut false spark.eventLog.enabled true spark.executor.cores 24 spark.ui.retainedJobs 10 spark.ui.retainedStages 10 spark.history.retainedApplications 5 spark.deploy.retainedApplications 10 spark.deploy.retainedDrivers 10 spark.streaming.ui.retainedBatches 10 spark.sql.thriftserver.ui.retainedSessions 10 spark.sql.thriftserver.ui.retainedStatements 100 spark.file.transferTo false spark.driver.maxResultSize 4g spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/* spark.eventLog.dirhdfs://mycluster/user/spark/historylog spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/* spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/* spark.sql.parquet.binaryAsString true spark.serializerorg.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer 32 spark.kryoserializer.buffer.max 256 spark.shuffle.consolidateFiles true spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec -- -- ??: Igor Berman;igor.ber...@gmail.com; : 2015??8??3??(??) 7:56 ??: Sea261810...@qq.com; : Barak Gitsisbar...@similarweb.com; Ted Yuyuzhih...@gmail.com; user@spark.apache.orguser@spark.apache.org; rxinr...@databricks.com; joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; : Re: About memory leak in spark 1.4.1 in general, what is your configuration? use --conf spark.logConf=true we have 1.4.1 in production standalone cluster and haven't experienced what you are describingcan you verify in web-ui that indeed spark got your 50g per executor limit? I mean in configuration page.. might be you are using offheap storage(Tachyon)? On 3 August 2015 at 04:58, Sea 261810...@qq.com wrote: spark uses a lot more than heap memory, it is the expected behavior. It didn't exist in spark 1.3.x What does a lot more than means? It means that I lose control of it! I try to apply 31g, but it still grows to 55g and continues to grow!!! That is the point! I have tried set memoryFraction to 0.2??but it didn't help. I don't know whether it will still exist in the next release 1.5, I wish not. -- -- ??: Barak Gitsis;bar...@similarweb.com; : 2015??8??2??(??) 9:55 ??: Sea261810...@qq.com; Ted Yuyuzhih...@gmail.com; : user@spark.apache.orguser@spark.apache.org; rxinr...@databricks.com; joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; : Re: About memory leak in spark 1.4.1 spark uses a lot more than heap memory, it is the expected behavior.in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3 Better use as little memory as you can for heap, and since you are not utilizing it already, it is safe for you to reduce it. memoryFraction helps
Label based MLLib MulticlassMetrics is buggy
The results in MulticlassMetrics is totally wrong. They are improperly calculated. Confusion matrix may be true I don't know but for each label scores are wrong. -- Hayri Volkan Agun PhD. Student - Anadolu University
Re: large scheduler delay in pyspark
Hi, Thanks a lot for your reply. It seems that it is because of the slowness of the second code. I rewrite code as list(set([i.items for i in a] + [i.items for i in b])). The program returns normal. By the way, I find that when the computation is running, UI will show scheduler delay. However, it is not scheduler delay. When computation finishes, UI will show correct scheduler delay time. Cheers Gen On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote: On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote: Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) Is it possible that you have large object that is also named `i` or `a` or `b`? Btw, the second one could be slow than first one, because you try to lookup a object in a list, that is O(N), especially when the object is large (dict). It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) If you see scheduler delay, it means there may be a large broadcast involved. I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen
Re: Spark Streaming - CheckPointing issue
Hi, What Spark version do you use? it looks like a problem of configuration recovery, not sure is it a twitter streaming specific problem, I tried Kafka streaming with checkpoint enabled in my local machine, seems no such issue. Did you try to set these configurations in somewhere? Thanks Saisai On Wed, Aug 5, 2015 at 8:34 PM, Sadaf sa...@platalytics.com wrote: Hi i've done the twitter streaming using twitter's streaming user api and spark streaming. this runs successfully on my local machine. but when i run this program on cluster in local mode. it just run successfully for the very first time. later on it gives the following exception. Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. and spark class path is unset already!! I have to make a new checkpoint directory each time to make it run successfully. otherwise it shows above exception. Can anyone help me to resolve this issue? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-CheckPointing-issue-tp24139.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Debugging Spark job in Eclipse
Deepesh, you have to call an action to start actual processing. words.count() would do the trick. On 05 Aug 2015, at 11:42, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, As spark job is executed when you run start() method of JavaStreamingContext. All the job like map, flatMap is already defined earlier but even though you put breakpoints in the function ,breakpoint doesn't stop there , then how can i debug the spark jobs. JavaDStreamString words=lines.flatMap(new FlatMapFunctionString, String() { private static final long serialVersionUID = -2042174881679341118L; @Override public IterableString call(String t) throws Exception { // Mark Debug Point here, it doesn't stop here. return Lists.newArrayList(SPACE.split(t)); } }); Please suggest how can i saw the in-between data values. Regards, Deepesh Eugene Morozov fathers...@list.ru
Best practices to call hiveContext in DataFrame.foreach in executor program or how to have a for loop in driver program
Hi I have the following code which fires hiveContext.sql() most of the time. My task is I want to create few table and insert values into after processing for all hive table partition. So I first fire show partitions and using its output in a for loop I call few methods which creates table if not exists and does insert into using hiveContext.sql. Now we cant execute hiveContext in executor so I have to execute this for loop in driver program and should run serially one by one. When I submit this Spark job in YARN cluster almost all the time my executor gets lost because of shuffle not found exception. Now this is happening because YARN is killing my executor because of memory overload. I dont understand why I have very less data set for each hive partition but still it causes YARN to kill my executor. Please guide why the following code is overkill memory will the following code do everything in parallel and try to accommodate all hive partition data in memory at the same time? Please guide I am blocked because of this issue. public static void main(String[] args) throws IOException { SparkConf conf = new SparkConf(); SparkContext sc = new SparkContext(conf); HiveContext hc = new HiveContext(sc); DataFrame partitionFrame = hiveContext.sql( show partitions dbdata partition(date=2015-08-05)); Row[] rowArr = partitionFrame.collect(); for(Row row : rowArr) { String[] splitArr = row.getString(0).split(/); String server = splitArr[0].split(=)[1]; String date = splitArr[1].split(=)[1]; String csvPath = hdfs:///user/db/ext/+server+.csv; if(fs.exists(new Path(csvPath))) { hiveContext.sql(ADD FILE + csvPath); } createInsertIntoTableABC(hc,entity, date); createInsertIntoTableDEF(hc,entity, date); createInsertIntoTableGHI(hc,entity,date); createInsertIntoTableJKL(hc,entity, date); createInsertIntoTableMNO(hc,entity,date); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-to-call-hiveContext-in-DataFrame-foreach-in-executor-program-or-how-to-have-a-for-loom-tp24141.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Hive - merge small files
Hello, I would love to have hive merge the small files in my managed hive context after every query. Right now, I am setting the hive configuration in my Spark Job configuration but hive is not managing the files. Do I need to set the hive fields in around place? How do you set Hive configurations in Spark? Here is what I'd like to set hive.merge.mapfilestrue hive.merge.mapredfilestrue hive.merge.size.per.task25600 hive.merge.smallfiles.avgsize1600
Memory allocation error with Spark 1.5
Hi, I'm receiving a memory allocation error with a recent build of Spark 1.5: java.io.IOException: Unable to acquire 67108864 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126) The issue appears when joining 2 datasets. One with 6084 records, the other one with 200 records. I'm expecting to receive 200 records in the result. I'm using a homemade build prepared from branch-1.5 with commit ID eedb996. I have run mvn -DskipTests clean install to generate that build. Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3. I've prepared a test case that can be built and executed very easily (data files are included in the repo): https://github.com/aseigneurin/spark-testcase One thing to note is that the issue arises when the master is set to local[*] but not when set to local. Both options work without problem with Spark 1.4, though. Any help will be greatly appreciated! Many thanks, Alexis
Re: No Twitter Input from Kafka to Spark Streaming
Thanks Akash for the answer. I added endpoint to the listener and now it is working. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL support for Hive 0.14
On 5 Aug 2015, at 02:08, Ishwardeep Singh ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote: Thanks Steve and Michael for your response. Is there a tentative release date for Spark 1.5? The branch is of and going through its bug stabilisation/test phase. This is where you can help contribute to the project by downloading and playing with the beta releases when they become available. This will give you early access to the code —but most of all, timely fixes to any problems that surface. find a bug in a beta and it should hopefully be fixed within the month. Find a bug in a production release and you can wait a lot longer. This probably matters even more when it comes to that Hive support, because its a big change, including changes to the SQL syntax to track HQL's changes. (More succinctly: some of the regression tests broke as things like `date` and `double` were now types, and UNION had evolved.) From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Tuesday, August 4, 2015 11:53 PM To: Steve Loughran ste...@hortonworks.commailto:ste...@hortonworks.com Cc: Ishwardeep Singh ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark SQL support for Hive 0.14 I'll add that while Spark SQL 1.5 compiles against Hive 1.2.1, it has support for reading from metastores for Hive 0.12 - 1.2.1 On Tue, Aug 4, 2015 at 9:59 AM, Steve Loughran ste...@hortonworks.commailto:ste...@hortonworks.com wrote: Spark 1.3.1 1.4 only support Hive 0.13 Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14 support entirely and go straight to the currently supported Hive release. See SPARK-8064 for the gory details On 3 Aug 2015, at 23:01, Ishwardeep Singh ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote: Hi, Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13. Is there a way to compile spark with Hive 0.14? Currently we are using Spark 1.3.1. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: Unable to load native-hadoop library for your platform
On 4 Aug 2015, at 12:26, Sean Owen so...@cloudera.com wrote: Oh good point, does the Windows integration need native libs for POSIX-y file system access? I know there are some binaries shipped for this purpose but wasn't sure if that's part of what's covered in the native libs message. On unix its a warning that you'll get slower access to bits of the system and no posix-ish support, on windows its more serious (and should be reported more seriously) It's used for various low-level things, and currently Hadoop fails somewhat messily when its not there. That's even for things like MiniYarnCluster, which is a real annoyance. I have a snapshot of the Hadoop 2.6 windows binaries if people want them; I should stick up a copy of the 2.7.1 ones too https://github.com/steveloughran/clusterconfigs/tree/master/clusters/morzine/hadoop_home/ On Tue, Aug 4, 2015 at 6:01 PM, Steve Loughran ste...@hortonworks.com wrote: Think it may be needed on Windows, certainly if you start trying to work with local files. On 4 Aug 2015, at 00:34, Sean Owen so...@cloudera.com wrote: It won't affect you if you're not actually running Hadoop. But it's mainly things like Snappy/LZO compression which are implemented as native libraries under the hood. There's a lot more in those native libs, primarily to bypass bits missing from the java APIs (FS permissions) and to add new features (encryption, soon erasure coding). The Hadoop file:// FS uses it on windows, at least for now - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Label based MLLib MulticlassMetrics is buggy
Hi Hayri, Can you provide a sample of the expected and actual results? Feynman On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com wrote: The results in MulticlassMetrics is totally wrong. They are improperly calculated. Confusion matrix may be true I don't know but for each label scores are wrong. -- Hayri Volkan Agun PhD. Student - Anadolu University
Re: Label based MLLib MulticlassMetrics is buggy
Also, what version of Spark are you using? On Wed, Aug 5, 2015 at 9:57 AM, Feynman Liang fli...@databricks.com wrote: Hi Hayri, Can you provide a sample of the expected and actual results? Feynman On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com wrote: The results in MulticlassMetrics is totally wrong. They are improperly calculated. Confusion matrix may be true I don't know but for each label scores are wrong. -- Hayri Volkan Agun PhD. Student - Anadolu University
Re: Spark SQL Hive - merge small files
This feature isn't currently supported. On Wed, Aug 5, 2015 at 8:43 AM, Brandon White bwwintheho...@gmail.com wrote: Hello, I would love to have hive merge the small files in my managed hive context after every query. Right now, I am setting the hive configuration in my Spark Job configuration but hive is not managing the files. Do I need to set the hive fields in around place? How do you set Hive configurations in Spark? Here is what I'd like to set hive.merge.mapfilestrue hive.merge.mapredfilestrue hive.merge.size.per.task25600 hive.merge.smallfiles.avgsize1600
Re: Memory allocation error with Spark 1.5
In Spark 1.5, we have a new way to manage memory (part of Project Tungsten). The default unit of memory allocation is 64MB, which is way too high when you have 1G of memory allocated in total and have more than 4 threads. We will reduce the default page size before releasing 1.5. For now, you can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m). https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125 On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin aseigneu...@ippon.fr wrote: Hi, I'm receiving a memory allocation error with a recent build of Spark 1.5: java.io.IOException: Unable to acquire 67108864 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126) The issue appears when joining 2 datasets. One with 6084 records, the other one with 200 records. I'm expecting to receive 200 records in the result. I'm using a homemade build prepared from branch-1.5 with commit ID eedb996. I have run mvn -DskipTests clean install to generate that build. Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3. I've prepared a test case that can be built and executed very easily (data files are included in the repo): https://github.com/aseigneurin/spark-testcase One thing to note is that the issue arises when the master is set to local[*] but not when set to local. Both options work without problem with Spark 1.4, though. Any help will be greatly appreciated! Many thanks, Alexis
Re: Spark SQL Hive - merge small files
So there is no good way to merge spark files in a manage hive table right now? On Wed, Aug 5, 2015 at 10:02 AM, Michael Armbrust mich...@databricks.com wrote: This feature isn't currently supported. On Wed, Aug 5, 2015 at 8:43 AM, Brandon White bwwintheho...@gmail.com wrote: Hello, I would love to have hive merge the small files in my managed hive context after every query. Right now, I am setting the hive configuration in my Spark Job configuration but hive is not managing the files. Do I need to set the hive fields in around place? How do you set Hive configurations in Spark? Here is what I'd like to set hive.merge.mapfilestrue hive.merge.mapredfilestrue hive.merge.size.per.task25600 hive.merge.smallfiles.avgsize1600
Re: Is SPARK-3322 fixed in latest version of Spark?
We tested Spark 1.2 and 1.3 , and this issue is gone. I know starting from 1.2, Spark uses netty instead of nio. So you mean that bypass this issue? Another question is , why this error message did not show in Spark 0.9 or older version? On Tue, Aug 4, 2015 at 11:01 PM, Aaron Davidson ilike...@gmail.com wrote: ConnectionManager has been deprecated and is no longer used by default (NettyBlockTransferService is the replacement). Hopefully you would no longer see these messages unless you have explicitly flipped it back on. On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote: And also https://issues.apache.org/jira/browse/SPARK-3106 This one is still open. On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote: *Symotom:* Even sample job fails: $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10 Pi is roughly 3.140636 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(xxx,) not found WARN ConnectionManager: All connections not cleaned up Found https://issues.apache.org/jira/browse/SPARK-3322 But the code changes are not in newer version os Spark, however this jira is marked as fixed. Is this issue really fixed in latest version? If so, what is the related JIRA? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Streaming and calculated-once semantics
Hello, here's a simple program that demonstrates my problem: ssc = StreamingContext(sc, 1) input = [ [(k1,12), (k2,14)], [(k1,22)] ] rawData = ssc.queueStream([sc.parallelize(d, 1) for d in input]) runningRawData = rawData.updateStateByKey(lambda nv, prev: reduce(sum, nv, prev or 0)) def stats(rdd) { keyavg = rdd.values().reduce(sum) / rdd.count() return rdd.mapValues(lambda i: i - keyavg) } runningRawData.transform(stats).print() I have a feeling this will calculate keyavg = rdd.values().reduce(sum) / rdd.count() inside stats quite a few times depending on the number of partitions on the current rdd. What would be an alternative way to do this two step computation without calculating the average many times?
Starting Spark SQL thrift server from within a streaming app
Hi, Is it possible to start the Spark SQL thrift server from with a streaming app so the streamed data could be queried as it's goes in ? Thank you. Daniel
Spark MLib v/s SparkR
I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib?
Re: Spark MLib v/s SparkR
SparkR doesn't support all the ML algorithms yet, the next 1.5 will have more support but still not all the algorithms that are currently supported in Mllib. SparkR is more of a convenience for R users to get acquainted with Spark at this point. -Ali From: praveen S mylogi...@gmail.commailto:mylogi...@gmail.com Date: Wednesday, August 5, 2015 at 2:24 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark MLib v/s SparkR I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib?
Re: Spark MLib v/s SparkR
A few points to consider: a) SparkR gives the union of R_in_a_single_machine and the distributed_computing_of_Spark: b) It also gives the ability to wrangle with data in R, that is in the Spark eco system c) Coming to MLlib, the question is MLlib and R (not MLlib or R) - depending on the scale, data location et al d) As Ali mentioned, some of the MLlib might not be supported in R (I haven't looked at it that carefully, but can be resolved by the APIs), OTOH, 1.5 is on it's way. e) So it all depends on the algorithms that one wants to use and whether one needs R for pre or post processing HTH. Cheers k/ On Wed, Aug 5, 2015 at 11:24 AM, praveen S mylogi...@gmail.com wrote: I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib?
spark hangs at broadcasting during a filter
I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some of the rows of A are relevant, so the following code first loads the triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int, Double]]] for each row (if I'm judging datatypes correctly). val valsrows = sc.textFile(valsinpath).map(_.split(,)). map(x = (x(1).toInt, (x(0).toInt, x(2).toDouble))). filter(x = !droprows.contains(x._1)). groupByKey. map(x = (x._1, x._2.toSeq.sortBy(_._1))) Spark hangs during a broadcast that occurs during the filter step (according to the Spark UI). The last two lines in the log before it pauses are: 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB) 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB) I've left Spark running for up to 17 minutes one time, and it never continues past this point. I'm using a cluster of 30 r3.8xlarge EC2 instances (244Gb, 32 cores) with spark in standalone mode with 220G executor and driver memory, and using the kyroserializer. Any ideas on what could be causing this hang? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark MLib v/s SparkR
What machine learning algorithms are you interested in exploring or using? Start from there or better yet the problem you are trying to solve, and then the selection may be evident. On Wednesday, August 5, 2015, praveen S mylogi...@gmail.com wrote: I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib? -- - Charles
Set Job Descriptions for Scala application
Hello, My Spark application is written in Scala and submitted to a Spark cluster in standalone mode. The Spark Jobs for my application are listed in the Spark UI like this: Job Id Description ... 6 saveAsTextFile at Foo.scala:202 5 saveAsTextFile at Foo.scala:201 4 count at Foo.scala:188 3 collect at Foo.scala:182 2 count at Foo.scala:162 1 count at Foo.scala:152 0 collect at Foo.scala:142 Is it possible to assign Job Descriptions to all these jobs in my Scala code? Thanks! Rares
Re: Set Job Descriptions for Scala application
SparkContext#setJobDescription or SparkContext#setJobGroup On Wed, Aug 5, 2015 at 12:29 PM, Rares Vernica rvern...@gmail.com wrote: Hello, My Spark application is written in Scala and submitted to a Spark cluster in standalone mode. The Spark Jobs for my application are listed in the Spark UI like this: Job Id Description ... 6 saveAsTextFile at Foo.scala:202 5 saveAsTextFile at Foo.scala:201 4 count at Foo.scala:188 3 collect at Foo.scala:182 2 count at Foo.scala:162 1 count at Foo.scala:152 0 collect at Foo.scala:142 Is it possible to assign Job Descriptions to all these jobs in my Scala code? Thanks! Rares
Re: spark hangs at broadcasting during a filter
How big is droprows? Try explicitly broadcasting it like this: val broadcastDropRows = sc.broadcast(dropRows) val valsrows = ... .filter(x = !broadcastDropRows.value.contains(x._1)) - Philip On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote: I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some of the rows of A are relevant, so the following code first loads the triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int, Double]]] for each row (if I'm judging datatypes correctly). val valsrows = sc.textFile(valsinpath).map(_.split(,)). map(x = (x(1).toInt, (x(0).toInt, x(2).toDouble))). filter(x = !droprows.contains(x._1)). groupByKey. map(x = (x._1, x._2.toSeq.sortBy(_._1))) Spark hangs during a broadcast that occurs during the filter step (according to the Spark UI). The last two lines in the log before it pauses are: 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB) 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB) I've left Spark running for up to 17 minutes one time, and it never continues past this point. I'm using a cluster of 30 r3.8xlarge EC2 instances (244Gb, 32 cores) with spark in standalone mode with 220G executor and driver memory, and using the kyroserializer. Any ideas on what could be causing this hang? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HiveContext error
Hello, I am trying to define an external Hive table from Spark HiveContext like the following: import org.apache.spark.sql.hive.HiveContext val hiveCtx = new HiveContext(sc) hiveCtx.sql(sCREATE EXTERNAL TABLE IF NOT EXISTS Rentrak_Ratings (Version string, Gen_Date string, Market_Number string, Market_Name string, Time_Zone string, Number_Households string, | DateTime string, Program_Start_Time string, Program_End_Time string, Station string, Station_Name string, Call_Sign string, Network_Name string, Program string, | Series_Name string, Series_Number string, Episode_Number string, Episode_Title string, Demographic string, Demographic_Name string, HHUniverse string, | Share_15min_Segment string, PHUT_15min_Segment string, Rating_15min_Segment string, AV_Audience_15min_Segment string) | PARTITIONED BY (year INT, month INT) | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'.stripMargin) And I am getting the following error: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Hive Internal Error: java.lang.ClassNotFoundException(org.apache.hadoop.hive.ql.hooks.ATSHook) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:324) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:292) at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:103) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:39) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) Can anybody help please? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
Re: Label based MLLib MulticlassMetrics is buggy
1.5 has not yet been released; what is the commit hash that you are building? On Wed, Aug 5, 2015 at 10:29 AM, Hayri Volkan Agun volkana...@gmail.com wrote: Hi, In Spark 1.5 I saw a result for precision 1.0 and recall 0.01 for decision tree classification. While precision a hundred the recall shouldn't be so small...I checked the code, everything seems ok but why I got such a result is unexplainable. As far as I understand from scala code the row sum is the actual class counts, the column sum is predictions sum am I right? I am doing additional tests for comparison with my own code... I attached a document for my reuters tests on page 3. On Wed, Aug 5, 2015 at 7:57 PM, Feynman Liang fli...@databricks.com wrote: Also, what version of Spark are you using? On Wed, Aug 5, 2015 at 9:57 AM, Feynman Liang fli...@databricks.com wrote: Hi Hayri, Can you provide a sample of the expected and actual results? Feynman On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com wrote: The results in MulticlassMetrics is totally wrong. They are improperly calculated. Confusion matrix may be true I don't know but for each label scores are wrong. -- Hayri Volkan Agun PhD. Student - Anadolu University -- Hayri Volkan Agun PhD. Student - Anadolu University
SparkConf ignoring keys
I've been using SparkConf on my project for quite some time now to store configuration information for its various components. This has worked very well thus far in situations where I have control over the creation of the SparkContext the SparkConf. I have run into a bit of a problem trying to integrate this same approach to the use of the shell, however. I have a bunch of properties in a properties file that are shared across several different types of applications (web containers, etc...) but the SparkConf ignores these properties because they aren't prefixed with spark.* Is this really necessary? It's not really stopping people from adding their own properties and it limits the power of being able to utilize one central configuration object.
Re: large scheduler delay in pyspark
It seems you want to dedupe your data after the merge so set(a+b) should also work..you may ditch the list comprehensiion operation. On 5 Aug 2015 23:55, gen tang gen.tan...@gmail.com wrote: Hi, Thanks a lot for your reply. It seems that it is because of the slowness of the second code. I rewrite code as list(set([i.items for i in a] + [i.items for i in b])). The program returns normal. By the way, I find that when the computation is running, UI will show scheduler delay. However, it is not scheduler delay. When computation finishes, UI will show correct scheduler delay time. Cheers Gen On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote: On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote: Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) Is it possible that you have large object that is also named `i` or `a` or `b`? Btw, the second one could be slow than first one, because you try to lookup a object in a list, that is O(N), especially when the object is large (dict). It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) If you see scheduler delay, it means there may be a large broadcast involved. I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen
How to read gzip data in Spark - Simple question
I have csv data that is embedded in gzip format on HDFS. *With Pig* a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak
Re: How to read gzip data in Spark - Simple question
This message means that java.util.Date is not supported by Spark DataFrame. You'll need to use java.sql.Date, I believe. On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. *With Pig* a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak -- Deepak
spark on mesos with docker from private repository
Hi, My Spark set up is a cluster on top of mesos using docker containers. I want to pull the docker images from a private repository (currently gcr.io ), and I can't get the authentication to work. I know how to generate a .dockercfg file (running on GCE, using gcloud docker -a). My problem is that as far as I understand I need this file in the root directory of the executor dir and I can't find a way to make spark executor to pull this file (not without changing spark code). Am I missing something? It seems that spark do support mesos+docker so I wonder what other people with this setup are doing? Thanks Eyal
Re: Reliable Streaming Receiver
Thanks Tathagata. I tried that but BlockGenerator internally uses SystemClock which is again private. We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less version. Is it possible to use the same code as a separate API with 1.2? Thanks, Sourabh On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote: You could very easily strip out the BlockGenerator code from the Spark source code and use it directly in the same way the Reliable Kafka Receiver uses it. BTW, you should know that we will be deprecating the receiver based approach for the Direct Kafka approach. That is quite flexible, can give exactly-once guarantee without WAL, and is more robust and performant. Consider using it. On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com wrote: Hi, I am trying to replicate the Kafka Streaming Receiver for a custom version of Kafka and want to create a Reliable receiver. The current implementation uses BlockGenerator which is a private class inside Spark streaming hence I can't use that in my code. Can someone help me with some resources to tackle this issue? Thanks, Sourabh
Re: Newbie question: can shuffle avoid writing and reading from disk?
Thanks! On Wed, Aug 5, 2015 at 5:24 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Yes, finally shuffle data will be written to disk for reduce stage to pull, no matter how large you set to shuffle memory fraction. Thanks Saisai On Thu, Aug 6, 2015 at 7:50 AM, Muler mulugeta.abe...@gmail.com wrote: thanks, so if I have enough large memory (with enough spark.shuffle.memory) then shuffle (in-memory shuffle) spill doesn't happen (per node) but still shuffle data has to be ultimately written to disk so that reduce stage pulls if across network? On Wed, Aug 5, 2015 at 4:40 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Muler, Shuffle data will be written to disk, no matter how large memory you have, large memory could alleviate shuffle spill where temporary file will be generated if memory is not enough. Yes, each node writes shuffle data to file and pulled from disk in reduce stage from network framework (default is Netty). Thanks Saisai On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote: Hi, Consider I'm running WordCount with 100m of data on 4 node cluster. Assuming my RAM size on each node is 200g and i'm giving my executors 100g (just enough memory for 100m data) 1. If I have enough memory, can Spark 100% avoid writing to disk? 2. During shuffle, where results have to be collected from nodes, does each node write to disk and then the results are pulled from disk? If not, what is the API that is being used to pull data from nodes across the cluster? (I'm thinking what Scala or Java packages would allow you to read in-memory data from other machines?) Thanks,
Re: How to connect to remote HDFS programmatically to retrieve data, analyse it and then write the data back to HDFS?
Please see the comments at the tail of SPARK-2356 Cheers On Wed, Aug 5, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com wrote: *Use Case:* To automate the process of data extraction (HDFS), data analysis (pySpark/sparkR) and saving the data back to HDFS programmatically. *Prospective solutions:* 1. Create a remote server connectivity program in an IDE like pyCharm or RStudio and use it to retrieve the data from HDFS or else 2. Create the data retrieval code in python or R and then point the IDE to the remote server using TCP. *Problem:* How to achieve either of the prospective solution 1 or 2 defined above? Do you have any better solution then these, if yes please share? *What have I tried so far?* The server and 3 namenodes already installed with pyspark and I have checked pyspark works in standalone mode on all four servers. Pyspark works in standalone mode on my laptop too. I use the following code but I am not able to connect to the remote server. import os import sys try: from pyspark import SparkContext from pyspark import SparkConf print (Pyspark sucess) except ImportError as e: print (Error importing Spark Modules, e) conf = SparkConf() conf.setMaster(spark://10.210.250.400:7077) conf.setAppName(First_Remote_Spark_Program) sc = SparkContext(conf=conf) print (connection succeeded with Master,conf) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) print(distData) The stack trace of error is Pyspark sucess 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) at org.apache.spark.SparkContext.init(SparkContext.scala:301) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote address
RE: How to read gzip data in Spark - Simple question
Have you tried reading the spark documentation? http://spark.apache.org/docs/latest/programming-guide.html Thank you, Ilya Ganelin -Original Message- From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com] Sent: Thursday, August 06, 2015 12:41 AM Eastern Standard Time To: Philip Weaver Cc: user Subject: Re: How to read gzip data in Spark - Simple question how do i persist the RDD to HDFS ? On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: This message means that java.util.Date is not supported by Spark DataFrame. You'll need to use java.sql.Date, I believe. On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.commailto:deepuj...@gmail.com wrote: That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.commailto:deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. With Pig a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak -- Deepak -- Deepak The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote: We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 Could you give it a shot to see whether it helps in your case? We've observed ~50x performance boost with schema merging turned on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions on the DataFrame, just initializing it like this takes *2 minutes*: val df = sqlContext.read.parquet(asdf) Is this normal? Is this because it is doing some bookeeping to discover all the partitions? Is it perhaps having to merge the schema from each partition? Would you expect it to get better or worse if I subpartition by another key? - Philip
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 Could you give it a shot to see whether it helps in your case? We've observed ~50x performance boost with schema merging turned on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions on the DataFrame, just initializing it like this takes *2 minutes*: val df = sqlContext.read.parquet(asdf) Is this normal? Is this because it is doing some bookeeping to discover all the partitions? Is it perhaps having to merge the schema from each partition? Would you expect it to get better or worse if I subpartition by another key? - Philip
Re: Reliable Streaming Receiver
Hi, You can try This Kafka Consumer for Spark which is also part of Spark Packages . https://github.com/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Thu, Aug 6, 2015 at 6:48 AM, Sourabh Chandak sourabh3...@gmail.com wrote: Thanks Tathagata. I tried that but BlockGenerator internally uses SystemClock which is again private. We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less version. Is it possible to use the same code as a separate API with 1.2? Thanks, Sourabh On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote: You could very easily strip out the BlockGenerator code from the Spark source code and use it directly in the same way the Reliable Kafka Receiver uses it. BTW, you should know that we will be deprecating the receiver based approach for the Direct Kafka approach. That is quite flexible, can give exactly-once guarantee without WAL, and is more robust and performant. Consider using it. On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com wrote: Hi, I am trying to replicate the Kafka Streaming Receiver for a custom version of Kafka and want to create a Reliable receiver. The current implementation uses BlockGenerator which is a private class inside Spark streaming hence I can't use that in my code. Can someone help me with some resources to tackle this issue? Thanks, Sourabh
Re: Upgrade of Spark-Streaming application
Hi For checkpointing and using fromOffsets arguments- Say for the first time when my app starts I don't have any prev state stored and I want to start consuming from largest offset 1. is it possible to specify that in fromOffsets api- I don't want to use another api which returs JavaPairInputDStream but fromoffsets api returns JavaDStream - since I want to keep further flow of my app same in both case. 2. So to achieve first(same flow in both cases) if I use diff api in 2 cases and when I transfer JavaPairInputDStream to JavaDStream using map function , I am no longer able to typecast transferred stream to HasOffsetRanges for getting offstes of current run- it throws class cast exception - when i do OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); on transformed stream - java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger c...@koeninger.org wrote: You can't use checkpoints across code upgrades. That may or may not change in the future, but for now that's a limitation of spark checkpoints (regardless of whether you're using Kafka). Some options: - Start up the new job on a different cluster, then kill the old job once it's caught up to where the new job started. If you care about duplicate work, you should be doing idempotent / transactional writes anyway, which should take care of the overlap between the two. If you're doing batches, you may need to be a little more careful about handling batch boundaries - Store the offsets somewhere other than the checkpoint, and provide them on startup using the fromOffsets argument to createDirectStream On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro nibbi...@gmail.com wrote: Hi, I've read about the recent updates about spark-streaming integration with Kafka (I refer to the new approach without receivers). In the new approach, metadata are persisted in checkpoint folders on HDFS so that the SparkStreaming context can be recreated in case of failures. This means that the streaming application will restart from the where it exited and the message consuming process continues with new messages only. Also, if I manually stop the streaming process and recreate the context from checkpoint (using an approach similar to https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala), the behavior would be the same. Now, suppose I want to change something in the software and modify the processing pipeline. Can spark use the previous checkpoint to recreate the new application? Will I ever be able to upgrade the software without processing all the messages in Kafka again? Regards, Nicola
Re: Reliable Streaming Receiver
You could very easily strip out the BlockGenerator code from the Spark source code and use it directly in the same way the Reliable Kafka Receiver uses it. BTW, you should know that we will be deprecating the receiver based approach for the Direct Kafka approach. That is quite flexible, can give exactly-once guarantee without WAL, and is more robust and performant. Consider using it. On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com wrote: Hi, I am trying to replicate the Kafka Streaming Receiver for a custom version of Kafka and want to create a Reliable receiver. The current implementation uses BlockGenerator which is a private class inside Spark streaming hence I can't use that in my code. Can someone help me with some resources to tackle this issue? Thanks, Sourabh
Re: How to read gzip data in Spark - Simple question
The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. *With Pig* a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak
Re: How to read gzip data in Spark - Simple question
That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. *With Pig* a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak -- Deepak
Re: How to read gzip data in Spark - Simple question
how do i persist the RDD to HDFS ? On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.com wrote: This message means that java.util.Date is not supported by Spark DataFrame. You'll need to use java.sql.Date, I believe. On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. *With Pig* a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak -- Deepak -- Deepak
Re: How to read gzip data in Spark - Simple question
Code: val summary = rowStructText.map(s = s.split(,)).map( { s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) } ) summary.saveAsTextFile(sparkO) Exception: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel formatStringAsDate: (dateStr: String)java.sql.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[263] at textFile at console:154 defined class Summary summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[265] at map at console:159 sumDF: org.apache.spark.sql.DataFrame = [f1: date, f2: bigint, f3: bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: int, f10: int, f11: float, f12: int, f13: int, f14: string] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 45.0 failed 4 times, most recent failure: Lost task 0.3 in stage 45.0 (TID 1872, datanode-6-3486.phx01.dev.ebayc3.com): java.lang.ArrayIndexOutOfBoundsException: 1 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:163) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:161) at scala.collection.Iterator$$anon On Wed, Aug 5, 2015 at 9:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: how do i persist the RDD to HDFS ? On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.com wrote: This message means that java.util.Date is not supported by Spark DataFrame. You'll need to use java.sql.Date, I believe. On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS.
Newbie question: can shuffle avoid writing and reading from disk?
Hi, Consider I'm running WordCount with 100m of data on 4 node cluster. Assuming my RAM size on each node is 200g and i'm giving my executors 100g (just enough memory for 100m data) 1. If I have enough memory, can Spark 100% avoid writing to disk? 2. During shuffle, where results have to be collected from nodes, does each node write to disk and then the results are pulled from disk? If not, what is the API that is being used to pull data from nodes across the cluster? (I'm thinking what Scala or Java packages would allow you to read in-memory data from other machines?) Thanks,
Re: Newbie question: can shuffle avoid writing and reading from disk?
Hi Muler, Shuffle data will be written to disk, no matter how large memory you have, large memory could alleviate shuffle spill where temporary file will be generated if memory is not enough. Yes, each node writes shuffle data to file and pulled from disk in reduce stage from network framework (default is Netty). Thanks Saisai On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote: Hi, Consider I'm running WordCount with 100m of data on 4 node cluster. Assuming my RAM size on each node is 200g and i'm giving my executors 100g (just enough memory for 100m data) 1. If I have enough memory, can Spark 100% avoid writing to disk? 2. During shuffle, where results have to be collected from nodes, does each node write to disk and then the results are pulled from disk? If not, what is the API that is being used to pull data from nodes across the cluster? (I'm thinking what Scala or Java packages would allow you to read in-memory data from other machines?) Thanks,
Pause Spark Streaming reading or sampling streaming data
Hi, I have a question about sampling Spark Streaming data, or getting part of the data. For every minute, I only want the data read in during the first 10 seconds, and discard all data in the next 50 seconds. Is there any way to pause reading and discard data in that period? I'm doing this to sample from a stream of huge amount of data, which saves processing time in the real-time program. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pause-Spark-Streaming-reading-or-sampling-streaming-data-tp24146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pause Spark Streaming reading or sampling streaming data
Hi, I have a question about sampling Spark Streaming data, or getting part of the data. For every minute, I only want the data read in during the first 10 seconds, and discard all data in the next 50 seconds. Is there any way to pause reading and discard data in that period? I'm doing this to sample from a stream of huge amount of data, which saves processing time in the real-time program. Thanks!
Re: Newbie question: can shuffle avoid writing and reading from disk?
thanks, so if I have enough large memory (with enough spark.shuffle.memory) then shuffle (in-memory shuffle) spill doesn't happen (per node) but still shuffle data has to be ultimately written to disk so that reduce stage pulls if across network? On Wed, Aug 5, 2015 at 4:40 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Muler, Shuffle data will be written to disk, no matter how large memory you have, large memory could alleviate shuffle spill where temporary file will be generated if memory is not enough. Yes, each node writes shuffle data to file and pulled from disk in reduce stage from network framework (default is Netty). Thanks Saisai On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote: Hi, Consider I'm running WordCount with 100m of data on 4 node cluster. Assuming my RAM size on each node is 200g and i'm giving my executors 100g (just enough memory for 100m data) 1. If I have enough memory, can Spark 100% avoid writing to disk? 2. During shuffle, where results have to be collected from nodes, does each node write to disk and then the results are pulled from disk? If not, what is the API that is being used to pull data from nodes across the cluster? (I'm thinking what Scala or Java packages would allow you to read in-memory data from other machines?) Thanks,
Windows function examples in pyspark
Hi to all, Im trying to use some windows functions (ntile and percentRank) for a Dataframe but i dont know how to use them. Does anyone can help me with this please? in the Python API documentation there are no examples about it. In specific, im trying to get quantiles of a numeric field in my dataframe. I'm using spark 1.4.0 Thanks a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windows-function-examples-in-pyspark-tp24144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Starting Spark SQL thrift server from within a streaming app
Hi Danniel, It is possible to create an instance of the SparkSQL Thrift server, however seems like this project is what you may be looking for: https://github.com/Intel-bigdata/spark-streamingsql Not 100% sure of your use case is, but you can always convert the data into DF then issue a query against it. If you want other systems to be able to query it then there are numerous connectors to store data into Hive, Cassandra, HBase, ElasticSearch, To create a instance of a thrift server with its own SQL Context you would do something like the following: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ makeRDD((1,hello) :: (2,world) ::Nil).toDF.cache().registerTempTable(t) HiveThriftServer2.startWithContext(sqlContext) } Again, I'm not really clear what your use case is, but it does sound like the first link above is what you may want. -Todd On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, Is it possible to start the Spark SQL thrift server from with a streaming app so the streamed data could be queried as it's goes in ? Thank you. Daniel
Re: trying to understand yarn-client mode
Hi DB Tsai-2, I am trying to run singleton sparkcontext in my container (spring-boot tomcat container). When my application bootstrap I used to create sparkContext and keep the reference for future job submission. I got it working with standalone spark perfectly but I am having trouble with yarn modes specially yarn-cluster mode. What is new Client(new ClientArguments(args, sparkConf), hadoopConfig, sparkConf).run API? How do submit subsequent request to spark after this? I use Java API but I can use scala too. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/trying-to-understand-yarn-client-mode-tp7925p24145.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Newbie question: can shuffle avoid writing and reading from disk?
Yes, finally shuffle data will be written to disk for reduce stage to pull, no matter how large you set to shuffle memory fraction. Thanks Saisai On Thu, Aug 6, 2015 at 7:50 AM, Muler mulugeta.abe...@gmail.com wrote: thanks, so if I have enough large memory (with enough spark.shuffle.memory) then shuffle (in-memory shuffle) spill doesn't happen (per node) but still shuffle data has to be ultimately written to disk so that reduce stage pulls if across network? On Wed, Aug 5, 2015 at 4:40 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Muler, Shuffle data will be written to disk, no matter how large memory you have, large memory could alleviate shuffle spill where temporary file will be generated if memory is not enough. Yes, each node writes shuffle data to file and pulled from disk in reduce stage from network framework (default is Netty). Thanks Saisai On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote: Hi, Consider I'm running WordCount with 100m of data on 4 node cluster. Assuming my RAM size on each node is 200g and i'm giving my executors 100g (just enough memory for 100m data) 1. If I have enough memory, can Spark 100% avoid writing to disk? 2. During shuffle, where results have to be collected from nodes, does each node write to disk and then the results are pulled from disk? If not, what is the API that is being used to pull data from nodes across the cluster? (I'm thinking what Scala or Java packages would allow you to read in-memory data from other machines?) Thanks,
Very high latency to initialize a DataFrame from partitioned parquet database.
I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions on the DataFrame, just initializing it like this takes *2 minutes*: val df = sqlContext.read.parquet(asdf) Is this normal? Is this because it is doing some bookeeping to discover all the partitions? Is it perhaps having to merge the schema from each partition? Would you expect it to get better or worse if I subpartition by another key? - Philip
Re: Pause Spark Streaming reading or sampling streaming data
Hi Dimitris, Thanks for your reply. Just wondering – are you asking about my streaming input source? I implemented a custom receiver and have been using that. Thanks. From: Dimitris Kouzis - Loukas look...@gmail.commailto:look...@gmail.com Date: Wednesday, August 5, 2015 at 5:27 PM To: Heath Guo heath...@fb.commailto:heath...@fb.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Pause Spark Streaming reading or sampling streaming data What driver do you use? Sounds like something you should do before the driver... On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo heath...@fb.commailto:heath...@fb.com wrote: Hi, I have a question about sampling Spark Streaming data, or getting part of the data. For every minute, I only want the data read in during the first 10 seconds, and discard all data in the next 50 seconds. Is there any way to pause reading and discard data in that period? I'm doing this to sample from a stream of huge amount of data, which saves processing time in the real-time program. Thanks!
PySpark in Pycharm- unable to connect to remote server
Use Case: I want to use my laptop (using Win 7 Professional) to connect to the CentOS 6.4 master server using PyCharm. Objective: To write the code in Pycharm on the laptop and then send the job to the server which will do the processing and should then return the result back to the laptop or to any other visualizing API. The server and 3 namenodes already installed with pyspark and I have checked pyspark works in standalone mode on all four servers. Pyspark works in standalone mode on my laptop too. I use the following code but I am not able to connect to the remote server. import os import sys try: from pyspark import SparkContext from pyspark import SparkConf print (Pyspark sucess) except ImportError as e: print (Error importing Spark Modules, e) conf = SparkConf() conf.setMaster(spark://10.210.250.400:7077) conf.setAppName(First_Remote_Spark_Program) sc = SparkContext(conf=conf) print (connection succeeded with Master,conf) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) print(distData) The stack trace of error is Pyspark sucess 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) at org.apache.spark.SparkContext.init(SparkContext.scala:301) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:09:06 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:09:06 WARN Remoting: Tried to associate with unreachable remote address
Reliable Streaming Receiver
Hi, I am trying to replicate the Kafka Streaming Receiver for a custom version of Kafka and want to create a Reliable receiver. The current implementation uses BlockGenerator which is a private class inside Spark streaming hence I can't use that in my code. Can someone help me with some resources to tackle this issue? Thanks, Sourabh
How to connect to remote HDFS programmatically to retrieve data, analyse it and then write the data back to HDFS?
*Use Case:* To automate the process of data extraction (HDFS), data analysis (pySpark/sparkR) and saving the data back to HDFS programmatically. *Prospective solutions:* 1. Create a remote server connectivity program in an IDE like pyCharm or RStudio and use it to retrieve the data from HDFS or else 2. Create the data retrieval code in python or R and then point the IDE to the remote server using TCP. *Problem:* How to achieve either of the prospective solution 1 or 2 defined above? Do you have any better solution then these, if yes please share? *What have I tried so far?* The server and 3 namenodes already installed with pyspark and I have checked pyspark works in standalone mode on all four servers. Pyspark works in standalone mode on my laptop too. I use the following code but I am not able to connect to the remote server. import os import sys try: from pyspark import SparkContext from pyspark import SparkConf print (Pyspark sucess) except ImportError as e: print (Error importing Spark Modules, e) conf = SparkConf() conf.setMaster(spark://10.210.250.400:7077) conf.setAppName(First_Remote_Spark_Program) sc = SparkContext(conf=conf) print (connection succeeded with Master,conf) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) print(distData) The stack trace of error is Pyspark sucess 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) at org.apache.spark.SparkContext.init(SparkContext.scala:301) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:09:06 WARN AppClient$ClientActor: