Re: counters in spark
Guys, Do you have any thoughts on this ? Thanks,Robert On Sunday, April 12, 2015 5:35 PM, Grandl Robert rgra...@yahoo.com.INVALID wrote: Hi guys, I was trying to figure out some counters in Spark, related to the amount of CPU or Memory used (in some metric), used by a task/stage/job, but I could not find any. Is there any such counter available ? Thank you,Robert
Re: Multiple Kafka Recievers
As far as I know, createStream doesn't let you specify where receivers are run. createDirectStream in 1.3 doesn't use long-running receivers, so it is likely to give you more even distribution of consumers across your workers. On Mon, Apr 13, 2015 at 11:31 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I have 4 workers and I am trying to have parallel Kafka receivers, 1 on each worker, with the following code. val kafkaStreams = (0 to args.length - 1).map{ i = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(i) - 1),StorageLevel.MEMORY_ONLY).map(_._2) } val unifiedStream = ssc.union(kafkaStreams) unifiedStream.print() But I am getting receivers mostly on two workers (two on each), sometime on three workers. Whats wrong with the code?? Regards, Laeeq - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Configuring amount of disk space available to spark executors in mesos?
Hey Jonathan, Are you referring to disk space used for storing persisted RDD's? For that, Spark does not bound the amount of data persisted to disk. It's a similar story to how Spark's shuffle disk output works (and also Hadoop and other frameworks make this assumption as well for their shuffle data, AFAIK). We could (in theory) add a storage level that bounds the amount of data persisted to disk and forces re-computation if the partition did not fit. I'd be interested to hear more about a workload where that's relevant though, before going that route. Maybe if people are using SSD's that would make sense. - Patrick On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney jcove...@gmail.com wrote: I'm surprised that I haven't been able to find this via google, but I haven't... What is the setting that requests some amount of disk space for the executors? Maybe I'm misunderstanding how this is configured... Thanks for any help! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: feature scaling in GeneralizedLinearAlgorithm.scala
Correct. Prediction doesn't touch that code path. -Xiangrui On Mon, Apr 13, 2015 at 9:58 AM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it says if userFeatureScaling is enabled, we will standardize the training features , and trained the model in the scaled space. Then we transform the coefficients from the scaled space to the original space My understanding then is we do not need to scale the test data since the coefficients are already in the original space, is this correct? Thanks Jianguo
Re: Configuring amount of disk space available to spark executors in mesos?
Nothing so complicated... we are seeing mesos kill off our executors immediately. When I reroute logging to an NFS directory we have available, the executors survive fine. As such I am wondering if the spark workers are getting killed by mesos for exceeding their disk quota (which atm is 0). This could be a red herring, however. 2015-04-13 15:41 GMT-04:00 Patrick Wendell pwend...@gmail.com: Hey Jonathan, Are you referring to disk space used for storing persisted RDD's? For that, Spark does not bound the amount of data persisted to disk. It's a similar story to how Spark's shuffle disk output works (and also Hadoop and other frameworks make this assumption as well for their shuffle data, AFAIK). We could (in theory) add a storage level that bounds the amount of data persisted to disk and forces re-computation if the partition did not fit. I'd be interested to hear more about a workload where that's relevant though, before going that route. Maybe if people are using SSD's that would make sense. - Patrick On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney jcove...@gmail.com wrote: I'm surprised that I haven't been able to find this via google, but I haven't... What is the setting that requests some amount of disk space for the executors? Maybe I'm misunderstanding how this is configured... Thanks for any help!
Re: Opening many Parquet files = slow
Hi guys Does anyone know how to stop Spark from opening all Parquet files before starting a job? This is quite a show stopper for me, since I have 5000 Parquet files on S3. Recap of what I tried: 1. Disable schema merging with: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “s3://path/to/folder)) This opens most files in the folder (17 out of 21 in my small example). For 5000 files on S3, sqlContext.load() takes 30 minutes to complete. 2. Use the old api with: sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”) Now sqlContext.parquetFile() only opens a few files and prints the schema: so far so good! However, as soon as I run e.g. a count() on the dataframe, Spark still opens all files _before_ starting a job/stage. Effectively this moves the delay from load() to count() (or any other action I presume). 3. Run Spark 1.3.1-rc2. sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the same as 1.3.0. Any help would be greatly appreciated! Thanks a lot. Eric On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi Ted Ah, I guess the term ‘source’ confused me :) Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to a single day of logs)) for 1 directory with 21 files, Spark opens 17 files: 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for reading at position '261573524' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for reading at position '259256807' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for reading at position '260002042' 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for reading at position ‘260875275' etc. I can’t seem to pass a comma-separated list of directories to load(), so in order to load multiple days of logs, I have to point to the root folder and depend on auto-partition discovery (unless there’s a smarter way). Doing: sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to root log dir)) starts opening what seems like all files (I killed the process after a couple of minutes). Thanks for helping out. Eric
Re: Spark TeraSort source request
Tom, According to Github's public activity log, Reynold Xin (in CC) deleted his sort-benchmark branch yesterday. I didn't have a local copy aside from the Daytona Partitioner (attached). Reynold, is it possible to reinstate your branch? -Ewan On 13/04/15 16:41, Tom Hubregtsen wrote: Thank you for your response Ewan. I quickly looked yesterday and it was there, but today at work I tried to open it again to start working on it, but it appears to be removed. Is this correct? Thanks, Tom On 12 April 2015 at 06:58, Ewan Higgs ewan.hi...@ugent.be mailto:ewan.hi...@ugent.be wrote: Hi all. The code is linked from my repo: https://github.com/ehiggs/spark-terasort This is an example Spark program for running TeraSort benchmarks. It is based on work from Reynold Xin's branch https://github.com/rxin/spark/tree/terasort, but it is not the same TeraSort program that currently holds the record http://sortbenchmark.org/. That program is here https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort. That program is here links to: https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort I've been working on other projects at the moment so I haven't returned to the spark-terasort stuff. If you have any pull requests, I would be very grateful. Yours, Ewan On 08/04/15 03:26, Pramod Biligiri wrote: +1. I would love to have the code for this as well. Pramod On Fri, Apr 3, 2015 at 12:47 PM, Tom thubregt...@gmail.com mailto:thubregt...@gmail.com wrote: Hi all, As we all know, Spark has set the record for sorting data, as published on: https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html. Here at our group, we would love to verify these results, and compare machine using this benchmark. We've spend quite some time trying to find the terasort source code that was used, but can not find it anywhere. We did find two candidates: A version posted by Reynold [1], the posted of the message above. This version is stuck at // TODO: Add partition-local (external) sorting using TeraSortRecordOrdering, only generating data. Here, Ewan noticed that it didn't appear to be similar to Hadoop TeraSort. [2] After this he created a version on his own [3]. With this version, we noticed problems with TeraValidate with datasets above ~10G (as mentioned by others at [4]. When examining the raw input and output files, it actually appears that the input data is sorted and the output data unsorted in both cases. Because of this, we believe we did not yet find the actual used source code. I've tried to search in the Spark User forum archive's, seeing request of people, indicating a demand, but did not succeed in finding the actual source code. My question: Could you guys please make the source code of the used TeraSort program, preferably with settings, available? If not, what are the reasons that this seems to be withheld? Thanks for any help, Tom Hubregtsen [1] https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87 [2] http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E [3] https://github.com/ehiggs/spark-terasort [4] http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the License); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * *http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS,
Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
That's why I think it's the OOM killer. There are several cases of memory overuse / errors : 1 - The application tries to allocate more than the Heap limit and GC cannot free more memory = OutOfMemory : Java Heap Space exception from JVM 2 - The jvm is configured with a max heap size larger than the available memory. At some point the application needs to allocate memory in JVM, the JVM tries to extend its heap and allocate real memory (or maybe the OS is configured with overcommit virtual memory), but fails = Kill process of sacrifice child (or others, depending on various factors : https://plumbr.eu/outofmemoryerror) 3 - The jvm has allocated its memory from the beginning and it has been served, but other processes start starving from memory shortage, the pressure on memory grows beyond the threshold configured in the OOM Killer, and boom, the java process is selected for a sacrifice because it is the main culprit of memory consumption. Guillaume Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap memory limits differently and throws OutOfMemoryError and eventually sends SIGINT. Not sure what happened but the worker simply received a SIGTERM signal, so perhaps the daemon was terminated by someone or a parent process. Just my guess. Tim On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: Very likely to be this : http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2 Your worker ran out of memory = maybe you're asking for too much memory for the JVM, or something else is running on the worker Guillaume Any idea what this means, many thanks == logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 == 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 http://SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM* -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
I'm not 100% sure of spark's implementation but in the MR frameworks, it would have a much larger shuffle write size becasue that node is dealing with a lot more data and as a result has a lot more to shuffle 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com: If it is really due to data skew, will the task hanging has much bigger Shuffle Write Size in this case? In this case, the shuffle write size for that task is 0, and the rest IO of this task is not much larger than the fast finished tasks, is that normal? I am also interested in this case, as from statistics on the UI, how it indicates the task could have skew data? Yong -- Date: Mon, 13 Apr 2015 12:58:12 -0400 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete From: jcove...@gmail.com To: deepuj...@gmail.com CC: user@spark.apache.org I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G What do i do ? I killed the job twice and its stuck again. Where is it stuck ? -- Deepak -- Deepak
Re: Multipart upload to S3 fails with Bad Digest Exceptions
I think I found where the problem comes from. I am writing lzo compressed thrift records using elephant-bird, my guess is that perhaps one side is computing the checksum based on the uncompressed data and the other on the compressed data, thus getting a mismatch. When writing the data as strings using a plain TextOutputFormat, the multi part upload works, this confirms that the lzo compression is probably the problem... but it is not a solution :( 2015-04-13 18:46 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com: Hi, I am not sure my problem is relevant to spark, but perhaps someone else had the same error. When I try to write files that need multipart upload to S3 from a job on EMR I always get this error: com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you specified did not match what we received. If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or output smaller files that don't require multi part upload), then everything works fine. I've seen an old thread on the ML where someone has the same error, but in my case I don't have any other errors on the worker nodes. I am using spark 1.2.1 and hadoop 2.4.0. Thanks, Eugen
Re: Spark support for Hadoop Formats (Avro)
The problem is likely that the underlying avro library is reusing objects for speed. You probably need to explicitly copy the values out of the reused record before the collect. On Sat, Apr 11, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: The read seem to be successfully as the values for each field in record are different and correct. The problem is when i collect it or trigger next processing (join with other table) , each of this probably triggers serialization and thats when all the fields in the record get the value of first field (or element). On Sun, Apr 12, 2015 at 9:14 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: We have very large processing being done on Hadoop (400 M/r Jobs, 1 Day duration, 100s of TB data, 100s of joins). We are exploring Spark as alternative to speed up our processing time. We use Scala + Scoobie today and Avro is the data format across steps. I observed a strange behavior, i read sample data (avro format, 10 records) and i collect it and print each record. All the data for each element within a record is wiped out and i only see data of first element being copied for everything. Is this a problem with Spark ? Or with using Avro ? Example: I took that RDD run through it and printed 4 elements from it, they all printed correctly. val x = viEvents.map { case (itemId, event) = println(event.get(guid), itemId, event.get(itemId), event.get(siteId)) (itemId, event) } The above code prints (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77) (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77) (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77) (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77) (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0) (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0) (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0) viEvents.collect.foreach(a = println(a._2.get(guid), a._1, a._2.get(itemId), a._2.get(siteId))) *Now, i collected it, this might have lead to serialization of the RDD.* Now when i print the same 4 elements, *i only get guid values for all. Has this got something to do with serialization ?* (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4) (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236) (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236) The RDD is of type org.apache.spark.rdd.RDD[(Long, com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)] At the time of context creation i did this val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer. *KryoSerializer*) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 1) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum], classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord], classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord], classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord], classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord], classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord])) The class heirarchy is DetailInputRecord extends InputRecord extends SessionRecord extends ExperimentationRecord extends org.apache.avro.generic.GenericRecord.Record(schema: Schema) Please suggest. -- Deepak -- Deepak
Re: Need some guidance
That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote: How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: Need some guidance
How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Rack locality
I want to use Rack locality feature of Apache Spark in my application. Is YARN the only resource manager which supports it as of now? After going through the source code, I found that default implementation of getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose) would be used by standalone mode. On the other hand, it is overriden in YarnScheduler.scala to fetch the rack information by invoking RackResolver api of hadoop which would be used when its run on YARN. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Parquet as External table - 1.3.x HiveMetastoreType now hidden
Here is the stack trace. The first part shows the log when the session is started in Tableau. It is using the init sql option on the data connection to create theTEMPORARY table myNodeTable. Ah, I see. thanks for providing the error. The problem here is that temporary tables do not exist in a database. They are visible no matter what the current database is. Tableau is asking for default.temporaryTable, which does not exist.
Re: How to load avro file into spark not on Hadoop in pyspark?
Note that I am running pyspark in local mode (I do not have a hadoop cluster connected) as I want to be able to work with the avro file outside of hadoop. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-avro-file-into-spark-not-on-Hadoop-in-pyspark-tp22480p22481.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Rack locality
Hi Riya, As far as I know, that is correct, unless Mesos fine-grained mode handles this in some mysterious way. -Sandy On Mon, Apr 13, 2015 at 2:09 PM, rcharaya riya.char...@gmail.com wrote: I want to use Rack locality feature of Apache Spark in my application. Is YARN the only resource manager which supports it as of now? After going through the source code, I found that default implementation of getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose) would be used by standalone mode. On the other hand, it is overriden in YarnScheduler.scala to fetch the rack information by invoking RackResolver api of hadoop which would be used when its run on YARN. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task result in Spark Worker Node
On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 then its serialized along with some other goodies, and finally sent back to the driver here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 What happens on the driver is quite a bit more complicated, and involves a number of spots in the code, but at least to get you started, the results are received here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328 though perhaps a more interesting spot is where they are handled in DagScheduler#handleTaskCompletion: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001 also, I think I know what you mean, but just to make sure: I wouldn't say the results from the worker are broadcast back to the driver. (a) in spark, broadcast tends to refer to a particular api for sharing immutable data from the driver to the workers (only one direction) and (b) it doesn't really fit a more general meaning of broadcast either, since the results are sent only to the driver, not to all nodes. On Sun, Mar 29, 2015 at 8:34 PM, raggy raghav0110...@gmail.com wrote: I am a PhD student working on a research project related to Apache Spark. I am trying to modify some of the spark source code such that instead of sending the final result RDD from the worker nodes to a master node, I want to send the final result RDDs to some different node. In order to do this, I have been trying to identify at which point the Spark worker nodes broadcast the results of a job back to the master. So far, I understand that in Spark, the master serializes the RDD and the functions to be applied on them and sends them over to the worker nodes. In the context of reduce, it serializes the RDD partition and the reduce function and sends them to the worker nodes. However, my understanding of how things happen at the worker node is very limited and I would appreciate it if someone could help me identify where the process of broadcasting the results of local worker computations back to the master node takes place. This is some of the limited knowledge that I have about the worker nodes: Each job gets divided into smaller sets of tasks called stages. Each Stage is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the task results are used as input for another stage. The result stage uses the RDD to compute the action that initiated the job. So, this result stage executes the last task for the job on the worker node. I would assume after this is done, it gets the result and broadcasts it to the driver application(the master). In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it states A task that sends back the output to the driver application.. However, I don't see when or where this happens in the source code. I would very much appreciate it if someone could help me identify where this happens in the Spark source code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Registering classes with KryoSerializer
Those funny class names come from scala's specialization -- its compiling a different version of OpenHashMap for each primitive you stick in the type parameter. Here's a super simple example: *➜ **~ * more Foo.scala class Foo[@specialized X] *➜ **~ * scalac Foo.scala *➜ **~ * ls Foo*.class Foo$mcB$sp.class Foo$mcC$sp.class Foo$mcD$sp.class Foo$mcF$sp.class Foo$mcI$sp.class Foo$mcJ$sp.class Foo$mcS$sp.class Foo$mcV$sp.class Foo$mcZ$sp.class Foo.class Sadly, I'm not sure of a foolproof way of getting all those specialized versions registered except for registering with these strange names. Here's an example of how its done by chill for Tuples (which is what spark is relying on for its own registration of tuples): https://github.com/twitter/chill/blob/6d03f6976f33f6e2e16b8e254fead1625720c281/chill-scala/src/main/scala/com/twitter/chill/TupleSerializers.scala#L861 On Mon, Mar 30, 2015 at 3:59 PM, Arun Lists lists.a...@gmail.com wrote: I am trying to register classes with KryoSerializer. I get the following error message: How do I find out what class is being referred to by: *OpenHashMap$mcI$sp ?* *com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: com.comp.common.base.OpenHashMap$mcI$sp* *Note: To register this class use: * *kryo.register(com.dtex.common.base.OpenHashMap$mcI$sp.class);* I have registered other classes with it by using: sparkConf.registerKryoClasses(Array( classOf[MyClass] )) Thanks, arun
Spark Worker IP Error
I tried to start the Spark Worker using the registered IP but this error occurred: 15/04/13 21:35:59 INFO Worker: Registered signal handlers for [TERM, HUP, INT] Exception in thread main java.net.UnknownHostException: 10.240.92.75/: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293) at java.net.InetAddress.getAllByName0(InetAddress.java:1246) at java.net.InetAddress.getAllByName(InetAddress.java:1162) at java.net.InetAddress.getAllByName(InetAddress.java:1098) at java.net.InetAddress.getByName(InetAddress.java:1048) at org.apache.spark.util.Utils$.getAddressHostName(Utils.scala:819) at org.apache.spark.util.Utils$.localIpAddressHostname$lzycompute(Utils.scala:763) at org.apache.spark.util.Utils$.localIpAddressHostname(Utils.scala:763) at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:815) at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:815) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.localHostName(Utils.scala:815) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:29) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:528) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) My concern is that I have deployed the Spark Master using the same process (assigning the corresponding IP) and everything works fine. I have also looked at the /etc/hosts file and the 10.240.92.75 is up and working fine. Moreover I have changed the ipv6 to ipv4 using _JAVA_OPTIONS: -Djava.net.preferIPv4Stack=true but still the error was keep coming up. I am a little bit confused; I would be grateful for any suggestions/help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-IP-Error-tp22484.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Registering classes with KryoSerializer
Hi, I am trying to register classes with KryoSerializer. This has worked with other programs. Usually the error messages are helpful in indicating which classes need to be registered. But with my current program, I get the following cryptic error message: *Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.reflect.ClassTag$$anon$1* *Note: To register this class use: kryo.register(scala.reflect.ClassTag$$anon$1.class);* How do I find out which class needs to be registered? I looked at my program and registered all classes used in RDDs. But clearly more classes remain to be registered if I can figure out which classes. Thanks for your help! arun
Re: How to get rdd count() without double evaluation of the RDD?
yes, it sounds like a good use of an accumulator to me val counts = sc.accumulator(0L) rdd.map{x = counts += 1 x }.saveAsObjectFile(file2) On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Sean Yes I know that I can use persist() to persist to disk, but it is still a big extra cost of persist a huge RDD to disk. I hope that I can do one pass to get the count as well as rdd.saveAsObjectFile(file2), but I don’t know how. May be use accumulator to count the total ? Ningjun *From:* Mark Hamstra [mailto:m...@clearstorydata.com] *Sent:* Thursday, March 26, 2015 12:37 PM *To:* Sean Owen *Cc:* Wang, Ningjun (LNG-NPV); user@spark.apache.org *Subject:* Re: How to get rdd count() without double evaluation of the RDD? You can also always take the more extreme approach of using SparkContext#runJob (or submitJob) to write a custom Action that does what you want in one pass. Usually that's not worth the extra effort. On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen so...@cloudera.com wrote: To avoid computing twice you need to persist the RDD but that need not be in memory. You can persist to disk with persist(). On Mar 26, 2015 4:11 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a rdd that is expensive to compute. I want to save it as object file and also print the count. How can I avoid double computation of the RDD? val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) val count = rdd.count() // this force computation of the rdd println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again I can avoid double computation by using cache val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line)) rdd.cache() val count = rdd.count() println(count) rdd.saveAsObjectFile(file2) // this compute the RDD again This only compute rdd once. However the rdd has millions of items and will cause out of memory. Question: how can I avoid double computation without using cache? Ningjun
Re: org.apache.spark.ml.recommendation.ALS
Hi Xiangrui, Here is the class: object ALSNew { def main (args: Array[String]) { val conf = new SparkConf() .setAppName(TrainingDataPurchase) .set(spark.executor.memory, 4g) conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2 conf.set(spark.storage.memoryFraction,0.3)//default is 0.6 val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() val rank = args(1).toInt val numIterations = args(2).toInt val regParam : Double = 0.01 val implicitPrefs : Boolean = true val numUserBlocks : Int = 100 val numItemBlocks : Int = 100 val nonnegative : Boolean = true //val paramMap = ParamMap (regParam=0.01) //paramMap.put(numUserBlocks=100, numItemBlocks=100) val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val alpha = als.getAlpha val model = als.fit(ratings) val predictions = model.transform(ratings) .select(rating, prediction) .map { case Row(rating: Float, prediction: Float) = (rating.toDouble, prediction.toDouble) } val rmse = if (implicitPrefs) { // TODO: Use a better (rank-based?) evaluation metric for implicit feedback. // We limit the ratings and the predictions to interval [0, 1] and compute the weighted RMSE // with the confidence scores as weights. val (totalWeight, weightedSumSq) = predictions.map { case (rating, prediction) = val confidence = 1.0 + alpha * math.abs(rating) val rating01 = math.max(math.min(rating, 1.0), 0.0) val prediction01 = math.max(math.min(prediction, 1.0), 0.0) val err = prediction01 - rating01 (confidence, confidence * err * err) }.reduce { case ((c0, e0), (c1, e1)) = (c0 + c1, e0 + e1) } math.sqrt(weightedSumSq /totalWeight) } else { val mse = predictions.map { case (rating, prediction) = val err = rating - prediction err * err }.mean() math.sqrt(mse) } println(Mean Squared Error = + rmse) } } I am using the following in my maven build (pom.xml): dependencies dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.11.2/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.3.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-mllib_2.11/artifactId version1.3.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.11/artifactId version1.3.0/version /dependency /dependencies I am using scala version 2.11.2. Could it be that spark-1.3.0-bin-hadoop2.4.tgz requires a different version of scala ? Thanks, Jay On Apr 9, 2015, at 4:38 PM, Xiangrui Meng men...@gmail.com wrote: Could you share ALSNew.scala? Which Scala version did you use? -Xiangrui On Wed, Apr 8, 2015 at 4:09 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, I tried running this on my local machine (laptop) and got the same error: Here is what I did: 1. downloaded spark 1.30 release version (prebuilt for hadoop 2.4 and later) spark-1.3.0-bin-hadoop2.4.tgz. 2. Ran the following command: spark-submit --class ALSNew --master local[8] ALSNew.jar /input_path The stack trace is exactly same. Thanks, Jay On Apr 8, 2015, at 10:47 AM, Jay Katukuri jkatuk...@apple.com wrote: some additional context: Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and used spark-submit from there. The cluster is still on spark-1.2.0. So, this looks to me that at runtime, the executors could not find some libraries of spark-1.3.0, even though I ran spark-submit from my downloaded spark-1.30. On Apr 6, 2015, at 1:37 PM, Jay Katukuri jkatuk...@apple.com wrote: Here is the command that I have used : spark-submit —class packagename.ALSNew --num-executors 100 --master yarn ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path Btw - I could run the old ALS in mllib package. On Apr 6, 2015, at 12:32 PM, Xiangrui Meng men...@gmail.com wrote: So ALSNew.scala is your own application, did you add it with spark-submit or spark-shell? The correct command should like spark-submit --class your.package.name.ALSNew
sbt-assembly spark-streaming-kinesis-asl error
Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20) at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala) The snippet from my build.sbt file is: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-streaming % 1.2.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.2.0-alpha1 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.2.0 % provided, And the error is originating from: val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) Am I correct to set spark-streaming-kinesis-asl as a *provided *dependency? Also, is there a merge strategy I need apply? Any help would be appreciated, Mike.
Re: sbt-assembly spark-streaming-kinesis-asl error
Thanks Vadim, I can certainly consume data from a Kinesis stream when running locally. I'm currently in the processes of extending my work to a proper cluster (i.e. using a spark-submit job via uber jar). Feel free to add me to gmail chat and maybe we can help each other. On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20) at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala) The snippet from my build.sbt file is: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-streaming % 1.2.0 % provided, com.datastax.spark %%
Re: sbt-assembly spark-streaming-kinesis-asl error
I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20) at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala) The snippet from my build.sbt file is: org.apache.spark %% spark-core % 1.2.0 % provided, org.apache.spark %% spark-streaming % 1.2.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.2.0-alpha1 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.2.0 % provided, And the error is originating from: val kinesisClient = new AmazonKinesisClient(new
How to access postgresql on Spark SQL
Hi all, Who know how to access postgresql on Spark SQL? Do I need add the postgresql dependency in build.sbt and set class path for it? Thanks. RegardsYi
Re: sbt-assembly spark-streaming-kinesis-asl error
Thanks Mike. I was having trouble on EC2. On Apr 13, 2015, at 10:25 PM, Mike Trienis mike.trie...@orcsol.com wrote: Thanks Vadim, I can certainly consume data from a Kinesis stream when running locally. I'm currently in the processes of extending my work to a proper cluster (i.e. using a spark-submit job via uber jar). Feel free to add me to gmail chat and maybe we can help each other. On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20) at
Re: [GraphX] aggregateMessages with active set
Hello, Great thanks for your reply. From the code I found that the reason why my program will scan all the edges is becasue of the EdgeDirection I passed into is EdgeDirection.Either. However I still met the problem of Time consuming of each iteration will not decrease by time. Thus I have two questions: 1. what is the meaning of activeFraction in [1] 2. As my edgeRDD is too large to cache into memory, I used StorageLevel.MEMORY_AND_DISK_SER as persist level. thus if the program used aggregateMessagesIndexScan, will the program still have to load all edge list into the memory? [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266 Alcaid 2015-04-10 2:47 GMT+08:00 Ankur Dave ankurd...@gmail.com: Actually, GraphX doesn't need to scan all the edges, because it maintains a clustered index on the source vertex id (that is, it sorts the edges by source vertex id and stores the offsets in a hash table). If the activeDirection is appropriately set, it can then jump only to the clusters with active source vertices. See the EdgePartition#index field [1], which stores the offsets, and the logic in GraphImpl#aggregateMessagesWithActiveSet [2], which decides whether to do a full scan or use the index. [1] https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala#L60 [2]. https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266 Ankur On Thu, Apr 9, 2015 at 3:21 AM, James alcaid1...@gmail.com wrote: In aggregateMessagesWithActiveSet, Spark still have to read all edges. It means that a fixed time which scale with graph size is unavoidable on a pregel-like iteration. But what if I have to iterate nearly 100 iterations but at the last 50 iterations there are only 0.1% nodes need to be updated ? The fixed time make the program finished at a unacceptable time consumption.
How can I add my custom Rule to spark sql?
Hi guys, I want to add my custom Rules(whatever the rule is) when the sql Logical Plan is being analysed. Is there a way to do that in the spark application code? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-add-my-custom-Rule-to-spark-sql-tp22485.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding Spark Memory distribution
broadcast variables count towards spark.storage.memoryFraction, so they use the same pool of memory as cached RDDs. That being said, I'm really not sure why you are running into problems, it seems like you have plenty of memory available. Most likely its got nothing to do with broadcast variables or caching -- its just whatever logic you are applying in your transformations that are causing lots of GC to occur during the computation. Hard to say without knowing more details. You could try increasing the timeout for the failed askWithReply by increasing spark.akka.lookupTimeout (defaults to 30), but that would most likely be treating a symptom, not the root cause. On Fri, Mar 27, 2015 at 4:52 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi All, I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have given 26gb of memory with all 8 cores to my executors. I can see that in the logs too: *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added: app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128 (10.x.y.z:40128) with 8 cores* I am not caching any RDD so I have set spark.storage.memoryFraction to 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB. I am now confused with these logs? *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager 10.77.100.196:58407 http://10.77.100.196:58407 with 4.5 GB RAM, BlockManagerId(4, 10.x.y.z, 58407)* I am broadcasting a large object of 3 gb and after that when I am creating an RDD, I see logs which show this 4.5 GB memory getting full and then I get OOM. How can I make block manager use more memory? Is there any other fine tuning I need to do for broadcasting large objects? And does broadcast variable use cache memory or rest of the heap? Thanks Ankur
Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database
Hi Linlin, have you got the solution for this issue, if yes then what are the thing need to make correct,because I am also getting same error,when submitting spark job in cluster mode getting error as under - 2015-04-14 18:16:43 DEBUG Transaction - Transaction rolled back in 0 ms 2015-04-14 18:16:43 ERROR DDLTask - org.apache.hadoop.hive.ql.metadata.HiveException: Database does not exist: my_database at org.apache.hadoop.hive.ql.exec.DDLTask.switchDatabase(DDLTask.java:4054) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:269) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java ... Please suggest, I have copied hive-site.xml in spark/conf in standalone its working fine. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-failed-in-yarn-cluster-mode-when-connecting-to-non-default-hive-database-tp11811p22486.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Expected behavior for DataFrame.unionAll
Hello, I am experimenting with DataFrame. I tried to construct two DataFrames with: 1. case class A(a: Int, b: String) scala adf.printSchema() root |-- a: integer (nullable = false) |-- b: string (nullable = true) 2. case class B(a: String, c: Int) scala bdf.printSchema() root |-- a: string (nullable = true) |-- c: integer (nullable = false) Then I unioned the these two DataFrame with the unionAll function, and I get the following schema. It is kind of a mixture of A and B. scala val udf = adf.unionAll(bdf) scala udf.printSchema() root |-- a: string (nullable = false) |-- b: string (nullable = true) The unionAll documentation says it behaves like the SQL UNION ALL function. However, unioning incompatible types is not well defined for SQL. Is there any expected behavior for unioning incompatible data frames? Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Expected-behavior-for-DataFrame-unionAll-tp22487.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [Spark1.3] UDF registration issue
Hi, It's a syntax error in Spark-1.3. The next release of spark supports the kind of UDF calls in DataFrame. See a link below. https://issues.apache.org/jira/browse/SPARK-6379 On Sat, Apr 11, 2015 at 3:30 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, I'm running into some trouble trying to register a UDF: scala sqlContext.udf.register(strLen, (s: String) = s.length()) res22: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(function1,IntegerType) scala cleanProcessDF.withColumn(dii,strLen(col(di))) console:33: error: not found: value strLen cleanProcessDF.withColumn(dii,strLen(col(di))) Where cleanProcessDF is a dataframe Is my syntax wrong? Or am I missing an import of some sort? -- --- Takeshi Yamamuro
Re: Spark 1.3.0: Running Pi example on YARN fails
Hi Zork, From the exception, it is still caused by hdp.version not being propagated correctly. Can you check whether there is any typo? [root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0–2041 [root@c6402 conf]# more spark-defaults.conf spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0–2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0–2041 This is HDP specific question, and you can move the topic to HDP forum. Thanks. Zhan Zhang On Apr 13, 2015, at 3:00 AM, Zork Sail zorks...@gmail.commailto:zorks...@gmail.com wrote: Hi Zhan, Alas setting: -Dhdp.version=2.2.0.0–2041 Does not help. Still get the same error: 15/04/13 09:53:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428918838408 final status: UNDEFINED tracking URL: http://foo.bar.site:8088/proxy/application_1427875242006_0037/ user: test 15/04/13 09:54:00 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:01 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:02 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:03 INFO yarn.Client: Application report for application_1427875242006_0037 (state: FAILED) 15/04/13 09:54:03 INFO yarn.Client: client token: N/A diagnostics: Application application_1427875242006_0037 failed 2 times due to AM Container for appattempt_1427875242006_0037_02 exited with exitCode: 1 For more detailed output, check application tracking page:http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_1427875242006_0037_02_01 Exit code: 1 Exception message: /mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh: line 27: $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure: bad substitution Stack trace: ExitCodeException exitCode=1: /mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh: line 27: $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure: bad substitution at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 1 Failing this attempt. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428918838408 final status: FAILED tracking URL:
RE: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
If it is really due to data skew, will the task hanging has much bigger Shuffle Write Size in this case? In this case, the shuffle write size for that task is 0, and the rest IO of this task is not much larger than the fast finished tasks, is that normal? I am also interested in this case, as from statistics on the UI, how it indicates the task could have skew data? Yong Date: Mon, 13 Apr 2015 12:58:12 -0400 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete From: jcove...@gmail.com To: deepuj...@gmail.com CC: user@spark.apache.org I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummaryviSummary.leafCategoryId = listing.getLeafCategId().toIntviSummary.itemSiteId = listing.getItemSiteId().toIntviSummary.auctionTypeCode = listing.getAuctTypeCode().toIntviSummary.sellerCountryId = listing.getSlrCntryId().toIntviSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0)val sellerId = listing.getSlrId.toLong(sellerId, (viDetail, viSummary, itemId))} Running Tasks:Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command:./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
Re: Need some guidance
The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
Any idea what this means, many thanks == logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 == 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*
Manning looking for a co-author for the GraphX in Action book
Hi all, Manning (the publisher) is looking for a co-author for the GraphX in Action book. The book currently has one author (Michael Malak), but they are looking for a co-author to work closely with Michael and improve the writings and make it more consumable. Early access page for the book: http://www.manning.com/malak/ Let me know if you are interested in that. Cheers.
RE: Kryo exception : Encountered unregistered class ID: 13994
Hello, Thank you for your answer. I'm already registering my classes as you're suggesting... Regards De : tsingfu [via Apache Spark User List] [mailto:ml-node+s1001560n22468...@n3.nabble.com] Envoyé : lundi 13 avril 2015 03:48 À : Mehdi Singer Objet : Re: Kryo exception : Encountered unregistered class ID: 13994 Hi, error message is mentioned: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 So I think this is issue with kryo, You should use `kryo.register(classOf[your_class_name])` in your app code. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22468.html To unsubscribe from Kryo exception : Encountered unregistered class ID: 13994, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=22437code=bWVoZGkuc2luZ2VyQGxhbXBpcmlzLmJlfDIyNDM3fC0xNDI5MjI3OTAz. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22471.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Kryo exception : Encountered unregistered class ID: 13994
You need to do few more things or you will eventually run into these issues val conf = new SparkConf() .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) * .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get)* * .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get)* .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) -Deepak On Mon, Apr 13, 2015 at 1:19 PM, mehdisinger mehdi.sin...@lampiris.be wrote: Hello, Thank you for your answer. I’m already registering my classes as you’re suggesting… Regards *De :* tsingfu [via Apache Spark User List] [mailto:ml-node+[hidden email] http:///user/SendEmail.jtp?type=nodenode=22471i=0] *Envoyé :* lundi 13 avril 2015 03:48 *À :* Mehdi Singer *Objet :* Re: Kryo exception : Encountered unregistered class ID: 13994 Hi, error message is mentioned: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 So I think this is issue with kryo, You should use `kryo.register(classOf[your_class_name])` in your app code. -- *If you reply to this email, your message will be added to the discussion below:* http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22468.html To unsubscribe from Kryo exception : Encountered unregistered class ID: 13994, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: RE: Kryo exception : Encountered unregistered class ID: 13994 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22471.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- Deepak
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, April 02, 2015 12:14 PM To: user@spark.apache.org mailto:user@spark.apache.org Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk space in this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun
Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
Very likely to be this : http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2 Your worker ran out of memory = maybe you're asking for too much memory for the JVM, or something else is running on the worker Guillaume Any idea what this means, many thanks == logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 == 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 http://SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM* -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: regarding ZipWithIndex
How about using mapToPair and exchanging the two. Will it be efficient Below is the code , will it be efficient to convert like this. JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithindex.mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); On 13 April 2015 at 03:11, Ted Yu yuzhih...@gmail.com wrote: Please also take a look at ZippedWithIndexRDDPartition which is 72 lines long. You can create your own version which extends RDD[(Long, T)] Cheers On Sun, Apr 12, 2015 at 1:29 PM, Ted Yu yuzhih...@gmail.com wrote: bq. will return something like JavaPairRDDObject, long The long component of the pair fits your description of index. What other requirement does ZipWithIndex not provide you ? Cheers On Sun, Apr 12, 2015 at 1:16 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have an RDD JavaRDDObject and I want to convert it to JavaPairRDDIndex,Object.. Index should be unique and it should maintain the order. For first object It should have 1 and then for second 2 like that. I tried using ZipWithIndex but it will return something like JavaPairRDDObject, long I wanted to use this RDD for lookup and join operation later in my workflow so ordering is important. Regards jeet
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun *From:*Wang, Ningjun (LNG-NPV) *Sent:*Thursday, April 02, 2015 12:14 PM *To:*user@spark.apache.org mailto:user@spark.apache.org *Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk spacein this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
ExceptionDriver-Memory while running Spark job on Yarn-cluster
Hi , When I am submitting spark job as --master yarn-cluster with below command/options getting driver memory error- spark-submit --jars ./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files datasource.properties,log4j.properties --master yarn-cluster --num-executors 1 --driver-memory 2g --executor-memory 512m --class com.test.spark.jobs.AggregationJob sparkagg.jar Exceptions as per yarn application ID log as under - Container: container_1428938273236_0006_01_01 on mycom.hostname.com_8041 = LogType: stderr LogLength: 128 Log Contents: Exception in thread Driver Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread Driver LogType: stdout LogLength: 40 Container: container_1428938273236_0006_02_01 on mycom.hostname.com_8041 = LogType: stderr LogLength: 1365 Log Contents: java.io.IOException: Log directory hdfs://mycom.hostname.com:8020/user/spark/applicationHistory/application_1428938273236_0006 already exists! at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) LogType: stdout LogLength: 40 please help its urgent for me, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Driver-Memory-while-running-Spark-job-on-Yarn-cluster-tp22475.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib : Gradient Boosted Trees classification confidence
Thank you Peter. I just want to be sure. even if I use the classification setting the GBT uses regression trees and not classification trees? I know the difference between the two(theoretically) is only in the loss and impurity functions. thus in case it uses classification trees doing what you proposed will result in the classification it self. Also by looking in the scala API I found that each Node holds a Predict object which contains probability of the label (classification only) ( https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.mllib.tree.model.Predict ) ** This what i called confidence So to sum-up my questions and confusion: 1. Does GBT uses classification trees when setting it to classification or it always uses regression trees ? 2. In case it uses classification trees , How could i efficiently get to the confidence = Node. Predict.prob ? Thanks again' Michael On Mon, Apr 13, 2015 at 10:13 AM, pprett [via Apache Spark User List] ml-node+s1001560n22470...@n3.nabble.com wrote: Hi Mike, Gradient Boosted Trees (or gradient boosted regression trees) dont store probabilities in each leaf node but rather model a continuous function which is then transformed via a logistic sigmoid (ie. like in a Logistic Regression model). If you are just interested in a confidence, you can use this continuous function directly: its just the (weighted) sum of the predictions of the individual regression trees. Use the absolute value for confidence and the sign to determine which class label. Here is an example: def score(features: Vector): Double = { val treePredictions = gbdt.trees.map(_.predict(features)) blas.ddot(gbdt.numTrees, treePredictions, 1, gbdt.treeWeights, 1) } If you are rather interested in probabilities, just pass the function value to a logistic sigmoid. best, Peter -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22470.html To unsubscribe from MLlib : Gradient Boosted Trees classification confidence, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=22466code=bWljaGFlbGtyYXNAZ21haWwuY29tfDIyNDY2fDQxMDYzODQ0Mw== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22476.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.3.0: Running Pi example on YARN fails
Hi Zhan, Alas setting: -Dhdp.version=2.2.0.0–2041 Does not help. Still get the same error: 15/04/13 09:53:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428918838408 final status: UNDEFINED tracking URL: http://foo.bar.site:8088/proxy/application_1427875242006_0037/ user: test 15/04/13 09:54:00 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:01 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:02 INFO yarn.Client: Application report for application_1427875242006_0037 (state: ACCEPTED) 15/04/13 09:54:03 INFO yarn.Client: Application report for application_1427875242006_0037 (state: FAILED) 15/04/13 09:54:03 INFO yarn.Client: client token: N/A diagnostics: Application application_1427875242006_0037 failed 2 times due to AM Container for appattempt_1427875242006_0037_02 exited with exitCode: 1 For more detailed output, check application tracking page: http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_1427875242006_0037_02_01 Exit code: 1 Exception message: /mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh: line 27: $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure: bad substitution Stack trace: ExitCodeException exitCode=1: /mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh: line 27: $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure: bad substitution at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 1 Failing this attempt. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428918838408 final status: FAILED tracking URL: http://foo.bar.site:8088/cluster/app/application_1427875242006_0037 user: test Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.Client.run(Client.scala:622) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at
Reading files from http server
Hi, i want to play with Criteo 1 tb dataset. Files are located on azure storage. Here's a command to download them: curl -O http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{`seq -s ‘,’ 0 23`}.gz is there any way to read files through http protocol with spark without downloading them first to hdfs?. Something like this: sc.textFile( http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{0-23}.gz;), so it will have 24 partitions. Thanks, Peter Rudenko
How to use multiple app jar files?
My app works fine with the single, uber jar file containing my app and all its dependencies. However, it takes about 20 minutes to copy the 65MB jar file up to the node on the cluster, so my code, compile, test cycle has become a core, compile, cooppp, test cycle. I'd like to have a single dependencies jar file on the node, and use a separate small jar for my app (which takes around 10 seconds to copy to the node). I've tried using --jars deps.jar, but that copies the deps.jar to the app-* folder but not the driver-* folder, so I get classNotFound errors on the driver. Various other combinations of flags, etc. have produced a fair bit of frustration but no progress. Any help with this would be greatly appreciated, as this problem is significantly stretching the length of my work day! Thanks.
Re: Parquet File Binary column statistics error when reuse byte[] among rows
Thanks Yijie! Also cc the user list. Cheng On 4/13/15 9:19 AM, Yijie Shen wrote: I opened a new Parquet JIRA ticket here: https://issues.apache.org/jira/browse/PARQUET-251 Yijie On April 12, 2015 at 11:48:57 PM, Cheng Lian (lian.cs@gmail.com mailto:lian.cs@gmail.com) wrote: Thanks for reporting this! Would you mind to open JIRA tickets for both Spark and Parquet? I'm not sure whether Parquet declares somewhere the user mustn't reuse byte arrays when using binary type. If it does, then it's a Spark bug. Anyway, this should be fixed. Cheng On 4/12/15 1:50 PM, Yijie Shen wrote: Hi, Suppose I create a dataRDD which extends RDD[Row], and each row is GenericMutableRow(Array(Int, Array[Byte])). A same Array[Byte] object is reused among rows but has different content each time. When I convert it to a dataFrame and save it as Parquet File, the file's row group statistic(max min) of Binary column would be wrong. Here is the reason: In Parquet, BinaryStatistic just keep max min as parquet.io.api.Binary references, Spark sql would generate a new Binary backed by the same Array[Byte] passed from row. reference backed max: Binary--ByteArrayBackedBinary-- Array[Byte] Therefore, each time parquet updating row group's statistic, max min would always refer to the same Array[Byte], which has new content each time. When parquet decides to save it into file, the last row's content would be saved as both max min. It seems it is a parquet bug because it's parquet's responsibility to update statistics correctly. But not quite sure. Should I report it as a bug in parquet JIRA? The spark JIRA is https://issues.apache.org/jira/browse/SPARK-6859
Re: Problem getting program to run on 15TB input
Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote: Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException:
Re: How to use multiple app jar files?
I faced exact same issue. The way i solved it was 1. Copy entire project. 2. Delete all the source, have only the dependencies in pom.xml. This will create, fat jar, without source but deps only. 3. In original project keep it as is, now build it. this will create a JAR (no deps, by default) Now add your deps jar to spark, with --jar This requires two projects, but thats better than earlier. On Mon, Apr 13, 2015 at 4:45 PM, Michael Weir michael.weir@gmail.com wrote: My app works fine with the single, uber jar file containing my app and all its dependencies. However, it takes about 20 minutes to copy the 65MB jar file up to the node on the cluster, so my code, compile, test cycle has become a core, compile, cooppp, test cycle. I'd like to have a single dependencies jar file on the node, and use a separate small jar for my app (which takes around 10 seconds to copy to the node). I've tried using --jars deps.jar, but that copies the deps.jar to the app-* folder but not the driver-* folder, so I get classNotFound errors on the driver. Various other combinations of flags, etc. have produced a fair bit of frustration but no progress. Any help with this would be greatly appreciated, as this problem is significantly stretching the length of my work day! Thanks. -- Deepak
Re: Packaging Java + Python library
Refer this post http://blog.prabeeshk.com/blog/2015/04/07/self-contained-pyspark-application/ On 13 April 2015 at 17:41, Punya Biswal pbis...@palantir.com wrote: Dear Spark users, My team is working on a small library that builds on PySpark and is organized like PySpark as well -- it has a JVM component (that runs in the Spark driver and executor) and a Python component (that runs in the PySpark driver and executor processes). What's a good approach for packaging such a library? Some ideas we've considered: - Package up the JVM component as a Jar and the Python component as a binary egg. This is reasonable but it means that there are two separate artifacts that people have to manage and keep in sync. - Include Python files in the Jar and add it to the PYTHONPATH. This follows the example of the Spark assembly jar, but deviates from the Python community's standards. We'd really appreciate hearing experiences from other people who have built libraries on top of PySpark. Punya
Re: Spark TeraSort source request
Thank you for your response Ewan. I quickly looked yesterday and it was there, but today at work I tried to open it again to start working on it, but it appears to be removed. Is this correct? Thanks, Tom On 12 April 2015 at 06:58, Ewan Higgs ewan.hi...@ugent.be wrote: Hi all. The code is linked from my repo: https://github.com/ehiggs/spark-terasort This is an example Spark program for running TeraSort benchmarks. It is based on work from Reynold Xin's branch https://github.com/rxin/spark/tree/terasort, but it is not the same TeraSort program that currently holds the record http://sortbenchmark.org/. That program is here https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort . That program is here links to: https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort I've been working on other projects at the moment so I haven't returned to the spark-terasort stuff. If you have any pull requests, I would be very grateful. Yours, Ewan On 08/04/15 03:26, Pramod Biligiri wrote: +1. I would love to have the code for this as well. Pramod On Fri, Apr 3, 2015 at 12:47 PM, Tom thubregt...@gmail.com wrote: Hi all, As we all know, Spark has set the record for sorting data, as published on: https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html. Here at our group, we would love to verify these results, and compare machine using this benchmark. We've spend quite some time trying to find the terasort source code that was used, but can not find it anywhere. We did find two candidates: A version posted by Reynold [1], the posted of the message above. This version is stuck at // TODO: Add partition-local (external) sorting using TeraSortRecordOrdering, only generating data. Here, Ewan noticed that it didn't appear to be similar to Hadoop TeraSort. [2] After this he created a version on his own [3]. With this version, we noticed problems with TeraValidate with datasets above ~10G (as mentioned by others at [4]. When examining the raw input and output files, it actually appears that the input data is sorted and the output data unsorted in both cases. Because of this, we believe we did not yet find the actual used source code. I've tried to search in the Spark User forum archive's, seeing request of people, indicating a demand, but did not succeed in finding the actual source code. My question: Could you guys please make the source code of the used TeraSort program, preferably with settings, available? If not, what are the reasons that this seems to be withheld? Thanks for any help, Tom Hubregtsen [1] https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87 [2] http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E [3] https://github.com/ehiggs/spark-terasort [4] http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Packaging Java + Python library
Dear Spark users, My team is working on a small library that builds on PySpark and is organized like PySpark as well -- it has a JVM component (that runs in the Spark driver and executor) and a Python component (that runs in the PySpark driver and executor processes). What's a good approach for packaging such a library? Some ideas we've considered: Package up the JVM component as a Jar and the Python component as a binary egg. This is reasonable but it means that there are two separate artifacts that people have to manage and keep in sync. Include Python files in the Jar and add it to the PYTHONPATH. This follows the example of the Spark assembly jar, but deviates from the Python community's standards. We'd really appreciate hearing experiences from other people who have built libraries on top of PySpark. Punya smime.p7s Description: S/MIME cryptographic signature
Sqoop parquet file not working in spark
Hi I imported a table from mssql server with Sqoop 1.4.5 in parquet format. But when I try to load it from Spark shell, it throws error like : scala val df1 = sqlContext.load(/home/bipin/Customer2) scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.NullPointerException parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:298) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) . . . at scala.collection.parallel.package$$anon$1.alongWith(package.scala:87) at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86) at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650) at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72) at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I looked at the sqoop parquet folder and it's structure is different than the one that I created on Spark. How can I make the parquet file work ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sqoop-parquet-file-not-working-in-spark-tp22477.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ExceptionDriver-Memory while running Spark job on Yarn-cluster
Try this ./bin/spark-submit -v --master yarn-cluster --jars ./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files datasource.properties,log4j.properties --num-executors 1 --driver-memory 4g *--driver-java-options -XX:MaxPermSize=1G* --executor-memory 2g --executor-cores 1 --class com.test.spark.jobs.AggregationJob sparkagg.jar I noticed that your using mysql-connector-java-5.1.17.jar. Are you running Spark-SQL (Hive queries from Spark) ? On Mon, Apr 13, 2015 at 3:53 PM, sachin Singh sachin.sha...@gmail.com wrote: Hi , When I am submitting spark job as --master yarn-cluster with below command/options getting driver memory error- spark-submit --jars ./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files datasource.properties,log4j.properties --master yarn-cluster --num-executors 1 --driver-memory 2g --executor-memory 512m --class com.test.spark.jobs.AggregationJob sparkagg.jar Exceptions as per yarn application ID log as under - Container: container_1428938273236_0006_01_01 on mycom.hostname.com_8041 = LogType: stderr LogLength: 128 Log Contents: Exception in thread Driver Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread Driver LogType: stdout LogLength: 40 Container: container_1428938273236_0006_02_01 on mycom.hostname.com_8041 = LogType: stderr LogLength: 1365 Log Contents: java.io.IOException: Log directory hdfs:// mycom.hostname.com:8020/user/spark/applicationHistory/application_1428938273236_0006 already exists! at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) LogType: stdout LogLength: 40 please help its urgent for me, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Driver-Memory-while-running-Spark-job-on-Yarn-cluster-tp22475.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak
Need some guidance
**Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Multipart upload to S3 fails with Bad Digest Exceptions
Hi, I am not sure my problem is relevant to spark, but perhaps someone else had the same error. When I try to write files that need multipart upload to S3 from a job on EMR I always get this error: com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you specified did not match what we received. If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or output smaller files that don't require multi part upload), then everything works fine. I've seen an old thread on the ML where someone has the same error, but in my case I don't have any other errors on the worker nodes. I am using spark 1.2.1 and hadoop 2.4.0. Thanks, Eugen
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G What do i do ? I killed the job twice and its stuck again. Where is it stuck ? -- Deepak -- Deepak
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
I can promise you that this is also a problem in the pig world :) not sure why it's not a problem for this data set, though... are you sure that the two are doing the exact same code? you should inspect your source data. Make a histogram for each and see what the data distribution looks like. If there is a value or bucket with a disproportionate set of values you know you have an issue 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what is catch all ? That implies is it a good idea to run a filter on each RDD first ? We do not do this using Pig on M/R. Is it required in Spark world ? On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote: My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G What do i do ? I killed the job twice and its stuck again. Where is it stuck ? -- Deepak -- Deepak
feature scaling in GeneralizedLinearAlgorithm.scala
Hi, In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it says if userFeatureScaling is enabled, we will standardize the training features , and trained the model in the scaled space. Then we transform the coefficients from the scaled space to the original space My understanding then is we do not need to scale the test data since the coefficients are already in the original space, is this correct? Thanks Jianguo
Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap memory limits differently and throws OutOfMemoryError and eventually sends SIGINT. Not sure what happened but the worker simply received a SIGTERM signal, so perhaps the daemon was terminated by someone or a parent process. Just my guess. Tim On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Very likely to be this : http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2 Your worker ran out of memory = maybe you're asking for too much memory for the JVM, or something else is running on the worker Guillaume Any idea what this means, many thanks == logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 == 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM* -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Spark Streaming Kafka Consumer, Confluent Platform, Avro StorageLevel
Hello, I'm trying to use a Spark Streaming (1.2.0-cdh5.3.2) consumer (via spark-streaming-kafka lib of the same version) with Kafka's Confluent Platform 1.0. I manage to make a Producer that produce my data and can check it via the avro-console-consumer : ./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning which displays : SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] {group:,domainType:job,action:apply_freely,entity:14564564132,user:{string:user},session:{string:session},date:20150326T154052.000+0100,ip:192.168.0.1,userAgent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.101 Safari/537.36,referer: http://www.kb.com/offre/controleur-de-gestion-h-f-9775029 ,os:Linux,deviceType:Computer,browser:Chrome,browserVersion:41.0.2272.101,browserRenderer:WEBKIT,physicalServerOrigin:01.front.local} {group:,domainType:job,action:apply_freely,entity:14564564132,user:{string:user},session:{string:session},date:20150326T154052.000+0100,ip:192.168.0.1,userAgent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.101 Safari/537.36,referer: http://www.kb.com/offre/controleur-de-gestion-h-f-9775029 ,os:Linux,deviceType:Computer,browser:Chrome,browserVersion:41.0.2272.101,browserRenderer:WEBKIT,physicalServerOrigin:01.front.local} So far, so good ! Now here's the weird thing, When I'm using the following the spark-streaming-kafka: val kafkaParams = Map[String, String](metadata.broker.list - brokers, group.id - consumer, zookeeper.connect - zookeeper, auto.offset.reset - smallest, schema.registry.url - schemaRegistryUrl) val topicMap = topics.split(,).map((_, 2)).toMap val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER) val events = messages.map(_._2) val dStream: DStream[Event] = events.map( avroMessage = { println(Avro content: + avroMessage) } ) dStream.print() Spark Streaming micro batch show me this : 15/04/10 16:11:40 INFO storage.BlockManager: Found block input-0-1428675099000 locally Avro content: {group: , domainType: , action: , entity: , user: , session: , date: , ip: , userAgent: , referer: , os: , deviceType: , browser: , browserVersion: , browserRenderer: , physicalServerOrigin: } Avro content: {group: , domainType: , action: , entity: , user: , session: , date: , ip: , userAgent: , referer: , os: , deviceType: , browser: , browserVersion: , browserRenderer: , physicalServerOrigin: } When I'm changing the snippet code to this (change the StorageLevel to a non *_SER one) like this : val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap, *StorageLevel.MEMORY_AND_DISK*) And it works as expected: 15/04/10 16:29:02 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2) 15/04/10 16:29:02 INFO storage.BlockManager: Found block input-0-1428676140400 locally Avro content: {group: , domainType: job, action: apply_freely, entity: 14564564132, user: user, session: session, date: 20150326T154052.000+0100, ip: 192.168.0.1, userAgent: Mozilla\/5.0 (X11; Linux x86_64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/41.0.2272.101 Safari\/537.36, referer: http:\/\/www.kb.com\/offre\/controleur-de-gestion-h-f-9775029, os: Linux, deviceType: Computer, browser: Chrome, browserVersion: 41.0.2272.101, browserRenderer: WEBKIT, physicalServerOrigin: 01.front.local} Avro content: {group: , domainType: job, action: apply_freely, entity: 14564564132, user: user, session: session, date: 20150326T154052.000+0100, ip: 192.168.0.1, userAgent: Mozilla\/5.0 (X11; Linux x86_64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/41.0.2272.101 Safari\/537.36, referer: http:\/\/www.kb.com\/offre\/controleur-de-gestion-h-f-9775029, os: Linux, deviceType: Computer, browser: Chrome, browserVersion: 41.0.2272.101, browserRenderer: WEBKIT, physicalServerOrigin: 01.front.local} If Kryo is disabled, the Java serialization seems to be able to read the record as expected in both _SER / NON _SER StorageLevel. I've tried several things with Kryo serialization with no good results (even just activate Kryo without registering any class manually). I don't know maybe I'm doing something wrong with the StorageLevel in Spark ? People in Confluent Platform mailing list
Help in transforming the RDD
Hi All I have an JavaPairRDDLong,String where each long key have 4 string values associated with it. I want to fire the Hbase query for look up of the each String part of RDD. This look-up will give result of around 7K integers.so for each key I will have 7k values. now my input RDD always already more than GB and after getting these result it will become around 50 GB which I want avoid . My problem. 1, Test1 1,test2 1.test3 1, test4 ... . Now I will query Hbase for Test1, test2 test3 ,test4 in parallel ech query will give result around 2K so total 8k of integers. Now for each record I will have 1*8000 entries in my RDD and suppose I have 1 million record it will become 1 million*8000 will is huge to process even using GroupBy.
Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete
My guess would be data skew. Do you know if there is some item id that is a catch all? can it be null? item id 0? lots of data sets have this sort of value and it always kills joins 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G What do i do ? I killed the job twice and its stuck again. Where is it stuck ? -- Deepak
Re: Could not compute split, block not found in Spark Streaming Simple Application
Whether I use 1 or 2 machines, the results are the same... Here follows the results I got using 1 and 2 receivers with 2 machines: 2 machines, 1 receiver: sbt/sbt run-main Benchmark 1 machine1 1000 21 | grep -i Total delay\|record 15/04/13 16:41:34 INFO JobScheduler: Total delay: 0.156 s for time 1428939694000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:41:35 INFO JobScheduler: Total delay: 0.155 s for time 1428939695000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:41:36 INFO JobScheduler: Total delay: 0.132 s for time 1428939696000 ms (execution: 0.119 s) Received 92910 records 15/04/13 16:41:37 INFO JobScheduler: Total delay: 0.172 s for time 1428939697000 ms (execution: 0.161 s) Received 92910 records 15/04/13 16:41:38 INFO JobScheduler: Total delay: 0.152 s for time 1428939698000 ms (execution: 0.140 s) Received 92910 records 15/04/13 16:41:39 INFO JobScheduler: Total delay: 0.162 s for time 1428939699000 ms (execution: 0.149 s) Received 92910 records 15/04/13 16:41:40 INFO JobScheduler: Total delay: 0.156 s for time 142893970 ms (execution: 0.143 s) Received 92910 records 15/04/13 16:41:41 INFO JobScheduler: Total delay: 0.148 s for time 1428939701000 ms (execution: 0.135 s) Received 92910 records 15/04/13 16:41:42 INFO JobScheduler: Total delay: 0.149 s for time 1428939702000 ms (execution: 0.135 s) Received 92910 records 15/04/13 16:41:43 INFO JobScheduler: Total delay: 0.153 s for time 1428939703000 ms (execution: 0.136 s) Received 92910 records 15/04/13 16:41:44 INFO JobScheduler: Total delay: 0.118 s for time 1428939704000 ms (execution: 0.111 s) Received 92910 records 15/04/13 16:41:45 INFO JobScheduler: Total delay: 0.155 s for time 1428939705000 ms (execution: 0.143 s) Received 92910 records 15/04/13 16:41:46 INFO JobScheduler: Total delay: 0.138 s for time 1428939706000 ms (execution: 0.126 s) Received 92910 records 15/04/13 16:41:47 INFO JobScheduler: Total delay: 0.154 s for time 1428939707000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:41:48 INFO JobScheduler: Total delay: 0.172 s for time 1428939708000 ms (execution: 0.160 s) Received 92910 records 15/04/13 16:41:49 INFO JobScheduler: Total delay: 0.144 s for time 1428939709000 ms (execution: 0.133 s) Receiver Statistics - Receiver - Status - Location - Records in last batch - [2015/04/13 16:53:54] - Minimum rate - [records/sec] - Median rate - [records/sec] - Maximum rate - [records/sec] - Last Error Receiver-0---10-10-100- 2 machines, 2 receivers: sbt/sbt run-main Benchmark 2 machine1 1000 21 | grep -i Total delay\|record Received 92910 records 15/04/13 16:43:13 INFO JobScheduler: Total delay: 0.153 s for time 1428939793000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:43:14 INFO JobScheduler: Total delay: 0.144 s for time 1428939794000 ms (execution: 0.136 s) Received 92910 records 15/04/13 16:43:15 INFO JobScheduler: Total delay: 0.145 s for time 1428939795000 ms (execution: 0.132 s) Received 92910 records 15/04/13 16:43:16 INFO JobScheduler: Total delay: 0.144 s for time 1428939796000 ms (execution: 0.134 s) Received 92910 records 15/04/13 16:43:17 INFO JobScheduler: Total delay: 0.148 s for time 1428939797000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:43:18 INFO JobScheduler: Total delay: 0.136 s for time 1428939798000 ms (execution: 0.123 s) Received 92910 records 15/04/13 16:43:19 INFO JobScheduler: Total delay: 0.155 s for time 1428939799000 ms (execution: 0.145 s) Received 92910 records 15/04/13 16:43:20 INFO JobScheduler: Total delay: 0.160 s for time 142893980 ms (execution: 0.152 s) Received 83619 records 15/04/13 16:43:21 INFO JobScheduler: Total delay: 0.141 s for time 1428939801000 ms (execution: 0.131 s) Received 102201 records 15/04/13 16:43:22 INFO JobScheduler: Total delay: 0.208 s for time 1428939802000 ms (execution: 0.197 s) Received 83619 records 15/04/13 16:43:23 INFO JobScheduler: Total delay: 0.160 s for time 1428939803000 ms (execution: 0.147 s) Received 92910 records 15/04/13 16:43:24 INFO JobScheduler: Total delay: 0.197 s for time 1428939804000 ms (execution: 0.185 s) Received 92910 records 15/04/13 16:43:25 INFO JobScheduler: Total delay: 0.200 s for time 1428939805000 ms (execution: 0.189 s) Received 92910 records 15/04/13 16:43:26 INFO JobScheduler: Total delay: 0.181 s for time 1428939806000 ms (execution: 0.173 s) Received 92910 records 15/04/13 16:43:27 INFO JobScheduler: Total delay: 0.189 s for time 1428939807000 ms (execution: 0.178 s) Receiver Statistics - Receiver - Status - Location - Records in last batch - [2015/04/13 16:49:36] - Minimum rate - [records/sec] - Median rate - [records/sec] - Maximum rate - [records/sec] - Last Error Receiver-0---10-10-10-9-Receiver-1--- On Thu, Apr 9, 2015 at 7:55 PM, Tathagata Das t...@databricks.com wrote: Are you running # of receivers = # machines?
Equi Join is taking for ever. 1 Task is Running while other 199 are complete
Code: val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents).map { case (itemId, (listing, viDetail)) = val viSummary = new VISummary viSummary.leafCategoryId = listing.getLeafCategId().toInt viSummary.itemSiteId = listing.getItemSiteId().toInt viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt viSummary.sellerCountryId = listing.getSlrCntryId().toInt viSummary.buyerSegment = 0 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() 0) 1 else 0) val sellerId = listing.getSlrId.toLong (sellerId, (viDetail, viSummary, itemId)) } Running Tasks: Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors 0 216 0 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h 13 min 3.0 GB / 56964921 0.0 B / 0 21.2 GB 1902.6 MB 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15 min 31 s 2.2 GB / 1666851 0.1 s 3.0 MB / 2062 54.8 GB 1924.5 MB 1 217 0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min 1.3 min 2.2 GB / 1687086 75 ms 3.9 MB / 2692 33.7 GB 1960.4 MB 4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15 min 56 s 2.2 GB / 1675654 40 ms 3.3 MB / 2260 26.2 GB 1928.4 MB Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar --num-executors 3000 --driver-memory 12g --driver-java-options -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G What do i do ? I killed the job twice and its stuck again. Where is it stuck ? -- Deepak
Configuring amount of disk space available to spark executors in mesos?
I'm surprised that I haven't been able to find this via google, but I haven't... What is the setting that requests some amount of disk space for the executors? Maybe I'm misunderstanding how this is configured... Thanks for any help!
What's the cleanest way to make spark aware of my custom scheduler?
I need to have my own scheduler to point to a proprietary remote execution framework. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2152 I'm looking at where it decides on the backend and it doesn't look like there is a hook. Of course I can extend sparkContext and add my own, but that seems sort of lame. Wondering if people know of a better way (or maybe I'm just missing something obvious)