Re: counters in spark

2015-04-13 Thread Grandl Robert
Guys,
Do you have any thoughts on this ?

Thanks,Robert 


 On Sunday, April 12, 2015 5:35 PM, Grandl Robert 
rgra...@yahoo.com.INVALID wrote:
   

 Hi guys,
I was trying to figure out some counters in Spark, related to the amount of CPU 
or Memory used (in some metric), used by a task/stage/job, but I could not find 
any. 
Is there any such counter available ?
Thank you,Robert





  

Re: Multiple Kafka Recievers

2015-04-13 Thread Cody Koeninger
As far as I know, createStream doesn't let you specify where receivers are
run.

createDirectStream in 1.3 doesn't use long-running receivers, so it is
likely to give you more even distribution of consumers across your workers.

On Mon, Apr 13, 2015 at 11:31 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
wrote:

 Hi,

 I have 4 workers and I am trying to have parallel Kafka receivers, 1 on
 each worker, with the following code.

 val kafkaStreams = (0 to args.length - 1).map{ i =
 KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(i) -
 1),StorageLevel.MEMORY_ONLY).map(_._2) }
 val unifiedStream = ssc.union(kafkaStreams)
 unifiedStream.print()

 But I am getting receivers mostly on two workers (two on each), sometime
 on three workers. Whats wrong with the code??

 Regards,
 Laeeq





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Patrick Wendell
Hey Jonathan,

Are you referring to disk space used for storing persisted RDD's? For
that, Spark does not bound the amount of data persisted to disk. It's
a similar story to how Spark's shuffle disk output works (and also
Hadoop and other frameworks make this assumption as well for their
shuffle data, AFAIK).

We could (in theory) add a storage level that bounds the amount of
data persisted to disk and forces re-computation if the partition did
not fit. I'd be interested to hear more about a workload where that's
relevant though, before going that route. Maybe if people are using
SSD's that would make sense.

- Patrick

On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney jcove...@gmail.com wrote:
 I'm surprised that I haven't been able to find this via google, but I
 haven't...

 What is the setting that requests some amount of disk space for the
 executors? Maybe I'm misunderstanding how this is configured...

 Thanks for any help!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: feature scaling in GeneralizedLinearAlgorithm.scala

2015-04-13 Thread Xiangrui Meng
Correct. Prediction doesn't touch that code path. -Xiangrui

On Mon, Apr 13, 2015 at 9:58 AM, Jianguo Li flyingfromch...@gmail.com
wrote:

 Hi,

 In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it
 says if userFeatureScaling is enabled, we will standardize the training
 features , and trained the model in the scaled space. Then we transform
 the coefficients from the scaled space to the original space 

 My understanding then is we do not need to scale the test data since the
 coefficients are already in the original space, is this correct?

 Thanks

 Jianguo






Re: Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Jonathan Coveney
Nothing so complicated... we are seeing mesos kill off our executors
immediately. When I reroute logging to an NFS directory we have available,
the executors survive fine. As such I am wondering if the spark workers are
getting killed by mesos for exceeding their disk quota (which atm is 0).
This could be a red herring, however.

2015-04-13 15:41 GMT-04:00 Patrick Wendell pwend...@gmail.com:

 Hey Jonathan,

 Are you referring to disk space used for storing persisted RDD's? For
 that, Spark does not bound the amount of data persisted to disk. It's
 a similar story to how Spark's shuffle disk output works (and also
 Hadoop and other frameworks make this assumption as well for their
 shuffle data, AFAIK).

 We could (in theory) add a storage level that bounds the amount of
 data persisted to disk and forces re-computation if the partition did
 not fit. I'd be interested to hear more about a workload where that's
 relevant though, before going that route. Maybe if people are using
 SSD's that would make sense.

 - Patrick

 On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney jcove...@gmail.com
 wrote:
  I'm surprised that I haven't been able to find this via google, but I
  haven't...
 
  What is the setting that requests some amount of disk space for the
  executors? Maybe I'm misunderstanding how this is configured...
 
  Thanks for any help!



Re: Opening many Parquet files = slow

2015-04-13 Thread Eric Eijkelenboom
Hi guys

Does anyone know how to stop Spark from opening all Parquet files before 
starting a job? This is quite a show stopper for me, since I have 5000 Parquet 
files on S3.

Recap of what I tried: 

1. Disable schema merging with: sqlContext.load(“parquet, Map(mergeSchema - 
false”, path - “s3://path/to/folder))
This opens most files in the folder (17 out of 21 in my small example). For 
5000 files on S3, sqlContext.load() takes 30 minutes to complete. 

2. Use the old api with: 
sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false”)
Now sqlContext.parquetFile() only opens a few files and prints the schema: 
so far so good! However, as soon as I run e.g. a count() on the dataframe, 
Spark still opens all files _before_ starting a job/stage. Effectively this 
moves the delay from load() to count() (or any other action I presume).

3. Run Spark 1.3.1-rc2.
sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the 
same as 1.3.0.

Any help would be greatly appreciated!

Thanks a lot. 

Eric




 On 10 Apr 2015, at 16:46, Eric Eijkelenboom eric.eijkelenb...@gmail.com 
 wrote:
 
 Hi Ted
 
 Ah, I guess the term ‘source’ confused me :)
 
 Doing:
 
 sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to a 
 single day of logs)) 
 
 for 1 directory with 21 files, Spark opens 17 files: 
 
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for 
 reading at position '261573524'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62'
  
 s3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62'
  for reading
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for 
 reading at position '259256807'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for 
 reading at position '260002042'
 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for 
 reading at position ‘260875275'
 etc.
 
 I can’t seem to pass a comma-separated list of directories to load(), so in 
 order to load multiple days of logs, I have to point to the root folder and 
 depend on auto-partition discovery (unless there’s a smarter way). 
 
 Doing: 
 
 sqlContext.load(“parquet, Map(mergeSchema - false”, path - “path to 
 root log dir)) 
 
 starts opening what seems like all files (I killed the process after a couple 
 of minutes).
 
 Thanks for helping out. 
 Eric



Re: Spark TeraSort source request

2015-04-13 Thread Ewan Higgs

Tom,
According to Github's public activity log, Reynold Xin (in CC) deleted 
his sort-benchmark branch yesterday. I didn't have a local copy aside 
from the Daytona Partitioner (attached).


Reynold, is it possible to reinstate your branch?

-Ewan

On 13/04/15 16:41, Tom Hubregtsen wrote:
Thank you for your response Ewan. I quickly looked yesterday and it 
was there, but today at work I tried to open it again to start working 
on it, but it appears to be removed. Is this correct?


Thanks,

Tom

On 12 April 2015 at 06:58, Ewan Higgs ewan.hi...@ugent.be 
mailto:ewan.hi...@ugent.be wrote:


Hi all.
The code is linked from my repo:

https://github.com/ehiggs/spark-terasort

This is an example Spark program for running TeraSort benchmarks.
It is based on work from Reynold Xin's branch
https://github.com/rxin/spark/tree/terasort, but it is not the
same TeraSort program that currently holds the record
http://sortbenchmark.org/. That program is here

https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort.


That program is here links to:

https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort

I've been working on other projects at the moment so I haven't
returned to the spark-terasort stuff. If you have any pull
requests, I would be very grateful.

Yours,
Ewan


On 08/04/15 03:26, Pramod Biligiri wrote:

+1. I would love to have the code for this as well.

Pramod

On Fri, Apr 3, 2015 at 12:47 PM, Tom thubregt...@gmail.com
mailto:thubregt...@gmail.com wrote:

Hi all,

As we all know, Spark has set the record for sorting data, as
published on:
https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html.

Here at our group, we would love to verify these results, and
compare
machine using this benchmark. We've spend quite some time
trying to find the
terasort source code that was used, but can not find it anywhere.

We did find two candidates:

A version posted by Reynold [1], the posted of the message
above. This
version is stuck at // TODO: Add partition-local
(external) sorting
using TeraSortRecordOrdering, only generating data.

Here, Ewan noticed that it didn't appear to be similar to
Hadoop TeraSort.
[2] After this he created a version on his own [3]. With this
version, we
noticed problems with TeraValidate with datasets above ~10G
(as mentioned by
others at [4]. When examining the raw input and output files,
it actually
appears that the input data is sorted and the output data
unsorted in both
cases.

Because of this, we believe we did not yet find the actual
used source code.
I've tried to search in the Spark User forum archive's,
seeing request of
people, indicating a demand, but did not succeed in finding
the actual
source code.

My question:
Could you guys please make the source code of the used
TeraSort program,
preferably with settings, available? If not, what are the
reasons that this
seems to be withheld?

Thanks for any help,

Tom Hubregtsen

[1]

https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87
[2]

http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E
[3] https://github.com/ehiggs/spark-terasort
[4]

http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org







/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the License); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an AS IS BASIS,
 

Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Guillaume Pitel
That's why I think it's the OOM killer. There are several cases of 
memory overuse / errors :


1 - The application tries to allocate more than the Heap limit and GC 
cannot free more memory = OutOfMemory : Java Heap Space exception from JVM
2 - The jvm is configured with a max heap size larger than the available 
memory. At some point the application needs to allocate memory in JVM, 
the JVM tries to extend its heap and allocate real memory (or maybe the 
OS is configured with overcommit virtual memory), but fails = Kill 
process of sacrifice child (or others, depending on various factors : 
https://plumbr.eu/outofmemoryerror)
3 - The jvm has allocated its memory from the beginning and it has been 
served, but other processes start starving from memory shortage, the 
pressure on memory grows beyond the threshold configured in the OOM 
Killer, and boom, the java process is selected for a sacrifice because 
it is the main culprit of memory consumption.


Guillaume
Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap 
memory limits differently and throws OutOfMemoryError and eventually 
sends SIGINT.


Not sure what happened but the worker simply received a SIGTERM 
signal, so perhaps the daemon was terminated by someone or a parent 
process. Just my guess.


Tim

On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel 
guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote:


Very likely to be this :

http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

Your worker ran out of memory = maybe you're asking for too much
memory for the JVM, or something else is running on the worker

Guillaume

Any idea what this means, many thanks

==
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910
with 4 cores, 6.6 GB RAM
15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home:
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:8081
http://SelectChannelConnector@0.0.0.0:8081
15/04/13 07:07:22 INFO Utils: Successfully started service
'WorkerUI' on port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with
master spark://08:7077
*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*




-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
I'm not 100% sure of spark's implementation but in the MR frameworks, it
would have a much larger shuffle write size becasue that node is dealing
with a lot more data and as a result has a lot  more to shuffle

2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com:

 If it is really due to data skew, will the task hanging has much bigger 
 Shuffle
 Write Size in this case?

 In this case, the shuffle write size for that task is 0, and the rest IO
 of this task is not much larger than the fast finished tasks, is that
 normal?

 I am also interested in this case, as from statistics on the UI, how it
 indicates the task could have skew data?

 Yong

 --
 Date: Mon, 13 Apr 2015 12:58:12 -0400
 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other
 199 are complete
 From: jcove...@gmail.com
 To: deepuj...@gmail.com
 CC: user@spark.apache.org


 I can promise you that this is also a problem in the pig world :) not sure
 why it's not a problem for this data set, though... are you sure that the
 two are doing the exact same code?

 you should inspect your source data. Make a histogram for each and see
 what the data distribution looks like. If there is a value or bucket with a
 disproportionate set of values you know you have an issue

 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 You mean there is a tuple in either RDD, that has itemID = 0 or null ?
 And what is catch all ?

 That implies is it a good idea to run a filter on each RDD first ? We do
 not do this using Pig on M/R. Is it required in Spark world ?

 On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 My guess would be data skew. Do you know if there is some item id that is
 a catch all? can it be null? item id 0? lots of data sets have this sort of
 value and it always kills joins

 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = listing.getLeafCategId().toInt
 viSummary.itemSiteId = listing.getItemSiteId().toInt
 viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
 viSummary.sellerCountryId = listing.getSlrCntryId().toInt
 viSummary.buyerSegment = 0
 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
  0) 1 else 0)
 val sellerId = listing.getSlrId.toLong
 (sellerId, (viDetail, viSummary, itemId))
 }

 Running Tasks:
 Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
 RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
 PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
 MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
 phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3 min
 2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0 SUCCESS
 PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15
 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4 MB



 Command:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  --num-executors 3000 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue
 hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
 /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
 startDate=2015-04-6 endDate=2015-04-7
 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
 output=/user/dvasthimal/epdatasets/viewItem buffersize=128
 maxbuffersize=1068 maxResultSize=2G


 What do i do ? I killed the job twice and its stuck again. Where is it
 stuck ?

 --
 Deepak





 --
 Deepak





Re: Multipart upload to S3 fails with Bad Digest Exceptions

2015-04-13 Thread Eugen Cepoi
I think I found where the problem comes from.

I am writing lzo compressed thrift records using elephant-bird, my guess is
that perhaps one side is computing the checksum based on the uncompressed
data and the other on the compressed data, thus getting a mismatch.

When writing the data as strings using a plain TextOutputFormat, the multi
part upload works, this confirms that the lzo compression is probably the
problem... but it is not a solution :(

2015-04-13 18:46 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hi,

 I am not sure my problem is relevant to spark, but perhaps someone else
 had the same error. When I try to write files that need multipart upload to
 S3 from a job on EMR I always get this error:

 com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you
 specified did not match what we received.

 If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or
 output smaller files that don't require multi part upload), then everything
 works fine.

 I've seen an old thread on the ML where someone has the same error, but in
 my case I don't have any other errors on the worker nodes.

 I am using spark 1.2.1 and hadoop 2.4.0.

 Thanks,
 Eugen



Re: Spark support for Hadoop Formats (Avro)

2015-04-13 Thread Michael Armbrust
The problem is likely that the underlying avro library is reusing objects
for speed.  You probably need to explicitly copy the values out of the
reused record before the collect.

On Sat, Apr 11, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 The read seem to be successfully as the values for each field in record
 are different and correct. The problem is when i collect it or trigger next
 processing (join with other table) , each of this probably triggers
 serialization and thats when all the fields in the record get the value of
 first field (or element).



 On Sun, Apr 12, 2015 at 9:14 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 We have very large processing being done on Hadoop (400 M/r Jobs, 1 Day
 duration, 100s of TB data, 100s of joins). We are exploring Spark as
 alternative to speed up our processing time. We use Scala + Scoobie today
 and Avro is the data format across steps.


 I observed a strange behavior, i read sample data (avro format, 10
 records) and i collect it and print each record. All the data for each
 element within a record is wiped out and i only see data of first element
 being copied for everything.

 Is this a problem with Spark ? Or with using Avro ?


 Example:

 I took that RDD run through it and printed 4 elements from it, they all
 printed correctly.


 val x = viEvents.map {
   case (itemId, event) =
 println(event.get(guid), itemId, event.get(itemId),
 event.get(siteId))
 (itemId, event)
 }

 The above code prints

 (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
 (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
 (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
 (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77)
 (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77)
 (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77)
 (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77)
 (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0)
 (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
 (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)

 viEvents.collect.foreach(a = println(a._2.get(guid), a._1,
 a._2.get(itemId), a._2.get(siteId)))

 *Now, i collected it, this might have lead to serialization of the RDD.* Now
 when i print the same 4 elements, *i only get guid values for all. Has
 this got something to do with serialization ?*


 (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)

 (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)

 (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)

 (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)

 (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)

 (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)

 (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)

 (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4)

 (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)

 (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)



 The RDD is of type org.apache.spark.rdd.RDD[(Long,
  com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)]

 At the time of context creation i did this
 val conf = new SparkConf()
   .setAppName(detail)
   .set(spark.serializer, org.apache.spark.serializer.
 *KryoSerializer*)
   .set(spark.kryoserializer.buffer.mb,
 arguments.get(buffersize).get)
   .set(spark.kryoserializer.buffer.max.mb,
 arguments.get(maxbuffersize).get)
   .set(spark.driver.maxResultSize,
 arguments.get(maxResultSize).get)
   .set(spark.yarn.maxAppAttempts, 1)

 .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum],

 classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord],

 classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord],

 classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord],

 classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord],

 classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord]))

 The class heirarchy is

 DetailInputRecord extends InputRecord extends SessionRecord extends
 ExperimentationRecord extends
org.apache.avro.generic.GenericRecord.Record(schema: Schema)


 Please suggest.



 --
 Deepak




 --
 Deepak




Re: Need some guidance

2015-04-13 Thread Dean Wampler
That appears to work, with a few changes to get the types correct:

input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) =
agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote:

 How about this?

 input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
 (agg1: Int, agg2: Int) = agg1 + agg2).collect()

 On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com
 wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco






Re: Need some guidance

2015-04-13 Thread Victor Tso-Guillen
How about this?

input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
(agg1: Int, agg2: Int) = agg1 + agg2).collect()

On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco





Rack locality

2015-04-13 Thread rcharaya
I want to use Rack locality feature of Apache Spark in my application.

Is YARN the only resource manager which supports it as of now?

After going through the source code, I found that default implementation of
getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose)
would be used by standalone mode.

On the other hand, it is overriden in YarnScheduler.scala to fetch the rack
information by invoking RackResolver api of hadoop which would be used when
its run on YARN.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Parquet as External table - 1.3.x HiveMetastoreType now hidden

2015-04-13 Thread Michael Armbrust

 Here is the stack trace. The first part shows the log when the session is
 started in Tableau. It is using the init sql option on the data
 connection to create theTEMPORARY table myNodeTable.


Ah, I see. thanks for providing the error.  The problem here is that
temporary tables do not exist in a database.  They are visible no matter
what the current database is.  Tableau is asking for
default.temporaryTable, which does not exist.


Re: How to load avro file into spark not on Hadoop in pyspark?

2015-04-13 Thread sa
Note that I am running pyspark in local mode (I do not have a hadoop cluster
connected) as I want to be able to work with the avro file outside of
hadoop.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-avro-file-into-spark-not-on-Hadoop-in-pyspark-tp22480p22481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Rack locality

2015-04-13 Thread Sandy Ryza
Hi Riya,

As far as I know, that is correct, unless Mesos fine-grained mode handles
this in some mysterious way.

-Sandy

On Mon, Apr 13, 2015 at 2:09 PM, rcharaya riya.char...@gmail.com wrote:

 I want to use Rack locality feature of Apache Spark in my application.

 Is YARN the only resource manager which supports it as of now?

 After going through the source code, I found that default implementation of
 getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose)
 would be used by standalone mode.

 On the other hand, it is overriden in YarnScheduler.scala to fetch the rack
 information by invoking RackResolver api of hadoop which would be used when
 its run on YARN.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Task result in Spark Worker Node

2015-04-13 Thread Imran Rashid
On the worker side, it all happens in Executor.  The task result is
computed here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210

then its serialized along with some other goodies, and finally sent back to
the driver here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255

What happens on the driver is quite a bit more complicated, and involves a
number of spots in the code, but at least to get you started, the results
are received here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328

though perhaps a more interesting spot is where they are handled in
DagScheduler#handleTaskCompletion:
https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001


also, I think I know what you mean, but just to make sure: I wouldn't say
the results from the worker are broadcast back to the driver.  (a) in
spark, broadcast tends to refer to a particular api for sharing immutable
data from the driver to the workers (only one direction) and (b) it doesn't
really fit a more general meaning of broadcast either, since the results
are sent only to the driver, not to all nodes.

On Sun, Mar 29, 2015 at 8:34 PM, raggy raghav0110...@gmail.com wrote:

 I am a PhD student working on a research project related to Apache Spark. I
 am trying to modify some of the spark source code such that instead of
 sending the final result RDD from the worker nodes to a master node, I want
 to send the final result RDDs to some different node. In order to do this,
 I
 have been trying to identify at which point the Spark worker nodes
 broadcast
 the results of a job back to the master.

 So far, I understand that in Spark, the master serializes the RDD and the
 functions to be applied on them and sends them over to the worker nodes. In
 the context of reduce, it serializes the RDD partition and the reduce
 function and sends them to the worker nodes. However, my understanding of
 how things happen at the worker node is very limited and I would appreciate
 it if someone could help me identify where the process of broadcasting the
 results of local worker computations back to the master node takes place.

 This is some of the limited knowledge that I have about the worker nodes:

 Each job gets divided into smaller sets of tasks called stages. Each Stage
 is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
 task results are used as input for another stage. The result stage uses the
 RDD to compute the action that initiated the job. So, this result stage
 executes the last task for the job on the worker node. I would assume after
 this is done, it gets the result and broadcasts it to the driver
 application(the master).

 In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler)
 it
 states A task that sends back the output to the driver application..
 However, I don't see when or where this happens in the source code. I would
 very much appreciate it if someone could help me identify where this
 happens
 in the Spark source code.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Registering classes with KryoSerializer

2015-04-13 Thread Imran Rashid
Those funny class names come from scala's specialization -- its compiling a
different version of OpenHashMap for each primitive you stick in the type
parameter.  Here's a super simple example:

*➜  **~ * more Foo.scala



class Foo[@specialized X]

*➜  **~ * scalac Foo.scala



*➜  **~ * ls Foo*.class



Foo$mcB$sp.class Foo$mcC$sp.class Foo$mcD$sp.class Foo$mcF$sp.class
Foo$mcI$sp.class Foo$mcJ$sp.class Foo$mcS$sp.class Foo$mcV$sp.class
Foo$mcZ$sp.class Foo.class

Sadly, I'm not sure of a foolproof way of getting all those specialized
versions registered except for registering with these strange names.
Here's an example of how its done by chill for Tuples (which is what spark
is relying on for its own registration of tuples):

https://github.com/twitter/chill/blob/6d03f6976f33f6e2e16b8e254fead1625720c281/chill-scala/src/main/scala/com/twitter/chill/TupleSerializers.scala#L861

On Mon, Mar 30, 2015 at 3:59 PM, Arun Lists lists.a...@gmail.com wrote:

 I am trying to register classes with KryoSerializer. I get the following
 error message:

 How do I find out what class is being referred to by: *OpenHashMap$mcI$sp
 ?*

 *com.esotericsoftware.kryo.KryoException:
 java.lang.IllegalArgumentException: Class is not registered:
 com.comp.common.base.OpenHashMap$mcI$sp*

 *Note: To register this class use: *
 *kryo.register(com.dtex.common.base.OpenHashMap$mcI$sp.class);*

 I have registered other classes with it by using:

 sparkConf.registerKryoClasses(Array(

   classOf[MyClass]

 ))


 Thanks,

 arun





Spark Worker IP Error

2015-04-13 Thread DStrip
I tried to start the Spark Worker using the registered IP but this error
occurred:

15/04/13 21:35:59 INFO Worker: Registered signal handlers for [TERM, HUP,
INT]
Exception in thread main java.net.UnknownHostException: 10.240.92.75/:
Name or service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
at java.net.InetAddress.getAllByName0(InetAddress.java:1246)
at java.net.InetAddress.getAllByName(InetAddress.java:1162)
at java.net.InetAddress.getAllByName(InetAddress.java:1098)
at java.net.InetAddress.getByName(InetAddress.java:1048)
at org.apache.spark.util.Utils$.getAddressHostName(Utils.scala:819)
at
org.apache.spark.util.Utils$.localIpAddressHostname$lzycompute(Utils.scala:763)
at org.apache.spark.util.Utils$.localIpAddressHostname(Utils.scala:763)
at
org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:815)
at
org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:815)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:815)
at
org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:29)
at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:528)
at org.apache.spark.deploy.worker.Worker.main(Worker.scala)

My concern is that I have deployed the Spark Master using the same process
(assigning the corresponding IP) and everything works fine. I have also
looked at the /etc/hosts file and the 10.240.92.75 is up and working fine.
Moreover I have changed the ipv6 to ipv4 using _JAVA_OPTIONS:
-Djava.net.preferIPv4Stack=true but still the error was keep coming up. I am
a little bit confused; I would be grateful for any suggestions/help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-IP-Error-tp22484.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Registering classes with KryoSerializer

2015-04-13 Thread Arun Lists
Hi,

I am trying to register classes with KryoSerializer. This has worked with
other programs. Usually the error messages are helpful in indicating which
classes need to be registered. But with my current program, I get the
following cryptic error message:

*Caused by: java.lang.IllegalArgumentException: Class is not registered:
scala.reflect.ClassTag$$anon$1*

*Note: To register this class use:
kryo.register(scala.reflect.ClassTag$$anon$1.class);*

How do I find out which class needs to be registered? I looked at my
program and registered all classes used in RDDs. But clearly more classes
remain to be registered if I can figure out which classes.

Thanks for your help!

arun


Re: How to get rdd count() without double evaluation of the RDD?

2015-04-13 Thread Imran Rashid
yes, it sounds like a good use of an accumulator to me

val counts = sc.accumulator(0L)
rdd.map{x =
  counts += 1
  x
}.saveAsObjectFile(file2)


On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Sean



 Yes I know that I can use persist() to persist to disk, but it is still a
 big extra cost of persist a huge RDD to disk. I hope that I can do one pass
 to get the count as well as rdd.saveAsObjectFile(file2), but I don’t know
 how.



 May be use accumulator to count the total ?



 Ningjun



 *From:* Mark Hamstra [mailto:m...@clearstorydata.com]
 *Sent:* Thursday, March 26, 2015 12:37 PM
 *To:* Sean Owen
 *Cc:* Wang, Ningjun (LNG-NPV); user@spark.apache.org
 *Subject:* Re: How to get rdd count() without double evaluation of the
 RDD?



 You can also always take the more extreme approach of using
 SparkContext#runJob (or submitJob) to write a custom Action that does what
 you want in one pass.  Usually that's not worth the extra effort.



 On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen so...@cloudera.com wrote:

 To avoid computing twice you need to persist the RDD but that need not be
 in memory. You can persist to disk with persist().

 On Mar 26, 2015 4:11 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

 I have a rdd that is expensive to compute. I want to save it as object
 file and also print the count. How can I avoid double computation of the
 RDD?



 val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line))



 val count = rdd.count()  // this force computation of the rdd

 println(count)

 rdd.saveAsObjectFile(file2) // this compute the RDD again



 I can avoid double computation by using cache



 val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line))

 rdd.cache()

 val count = rdd.count()

 println(count)

 rdd.saveAsObjectFile(file2) // this compute the RDD again



 This only compute rdd once. However the rdd has millions of items and will
 cause out of memory.



 Question: how can I avoid double computation without using cache?





 Ningjun





Re: org.apache.spark.ml.recommendation.ALS

2015-04-13 Thread Jay Katukuri

Hi Xiangrui,

Here is the class:


object ALSNew {

 def main (args: Array[String]) {
 val conf = new SparkConf()
  .setAppName(TrainingDataPurchase)
  .set(spark.executor.memory, 4g)
  
  conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2  
conf.set(spark.storage.memoryFraction,0.3)//default is 0.6 


val sc = new SparkContext(conf) 
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

 val pfile = args(0)
 val purchase=sc.textFile(pfile)
   

val ratings = purchase.map ( line =
line.split(',') match { case Array(user, item, rate) =
(user.toInt, item.toInt, rate.toFloat)
}).toDF()
  

val rank = args(1).toInt
val numIterations = args(2).toInt
val regParam : Double = 0.01
val implicitPrefs : Boolean = true
val numUserBlocks : Int = 100
val numItemBlocks : Int = 100
val nonnegative : Boolean = true

//val paramMap = ParamMap (regParam=0.01)
//paramMap.put(numUserBlocks=100,  numItemBlocks=100)
   val als = new ALS()
   .setRank(rank)
  .setRegParam(regParam)
  .setImplicitPrefs(implicitPrefs)
  .setNumUserBlocks(numUserBlocks)
  .setNumItemBlocks(numItemBlocks)
  
 
val alpha = als.getAlpha
  
   
  val model =  als.fit(ratings)
  
  
  val predictions = model.transform(ratings)
  .select(rating, prediction)
  .map { case Row(rating: Float, prediction: Float) =
(rating.toDouble, prediction.toDouble)
  }
val rmse =
  if (implicitPrefs) {
// TODO: Use a better (rank-based?) evaluation metric for implicit 
feedback.
// We limit the ratings and the predictions to interval [0, 1] and 
compute the weighted RMSE
// with the confidence scores as weights.
val (totalWeight, weightedSumSq) = predictions.map { case (rating, 
prediction) =
  val confidence = 1.0 + alpha * math.abs(rating)
  val rating01 = math.max(math.min(rating, 1.0), 0.0)
  val prediction01 = math.max(math.min(prediction, 1.0), 0.0)
  val err = prediction01 - rating01
  (confidence, confidence * err * err)
}.reduce { case ((c0, e0), (c1, e1)) =
  (c0 + c1, e0 + e1)
}
math.sqrt(weightedSumSq /totalWeight)
  } else {
val mse = predictions.map { case (rating, prediction) =
  val err = rating - prediction
  err * err
}.mean()
math.sqrt(mse)
  }

println(Mean Squared Error =  + rmse)
 }
 
 
 
 }




I am using the following in my maven build (pom.xml): 


dependencies
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-library/artifactId
  version2.11.2/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.11/artifactId
  version1.3.0/version
/dependency

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-mllib_2.11/artifactId
version1.3.0/version
   /dependency
   dependency
   groupIdorg.apache.spark/groupId
artifactIdspark-sql_2.11/artifactId
version1.3.0/version
   /dependency
  /dependencies


I am using scala version 2.11.2.

Could it be that spark-1.3.0-bin-hadoop2.4.tgz requires  a different version 
of scala ?

Thanks,
Jay



On Apr 9, 2015, at 4:38 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you share ALSNew.scala? Which Scala version did you use? -Xiangrui
 
 On Wed, Apr 8, 2015 at 4:09 PM, Jay Katukuri jkatuk...@apple.com wrote:
 Hi Xiangrui,
 
 I tried running this on my local machine  (laptop) and got the same error:
 
 Here is what I did:
 
 1. downloaded spark 1.30 release version (prebuilt for hadoop 2.4 and later)
 spark-1.3.0-bin-hadoop2.4.tgz.
 2. Ran the following command:
 
 spark-submit --class ALSNew  --master local[8] ALSNew.jar  /input_path
 
 
 The stack trace is exactly same.
 
 Thanks,
 Jay
 
 
 
 On Apr 8, 2015, at 10:47 AM, Jay Katukuri jkatuk...@apple.com wrote:
 
 some additional context:
 
 Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and
 used spark-submit from there.
 The cluster is still on spark-1.2.0.
 
 So, this looks to me that at runtime, the executors could not find some
 libraries of spark-1.3.0, even though I ran spark-submit from my downloaded
 spark-1.30.
 
 
 
 On Apr 6, 2015, at 1:37 PM, Jay Katukuri jkatuk...@apple.com wrote:
 
 Here is the command that I have used :
 
 spark-submit —class packagename.ALSNew --num-executors 100 --master yarn
 ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path
 
 Btw - I could run the old ALS in mllib package.
 
 
 
 
 
 On Apr 6, 2015, at 12:32 PM, Xiangrui Meng men...@gmail.com wrote:
 
 So ALSNew.scala is your own application, did you add it with
 spark-submit or spark-shell? The correct command should like
 
 spark-submit --class your.package.name.ALSNew 

sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Hi All,

I have having trouble building a fat jar file through sbt-assembly.

[warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF/NOTICE' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/services/java.sql.Driver' with strategy
'filterDistinctLines'
[warn] Merging 'rootdoc.txt' with strategy 'concat'
[warn] Strategy 'concat' was applied to a file
[warn] Strategy 'discard' was applied to 17 files
[warn] Strategy 'filterDistinctLines' was applied to a file
[warn] Strategy 'rename' was applied to 4 files

When submitting the spark application through the command

sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
target/scala-2.10/-snapshot.jar

I end up the the following error,

Exception in thread main java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155)
at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20)
at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala)

The snippet from my build.sbt file is:

org.apache.spark %% spark-core % 1.2.0 % provided,
org.apache.spark %% spark-streaming % 1.2.0 % provided,
com.datastax.spark %% spark-cassandra-connector %
1.2.0-alpha1 % provided,
org.apache.spark %% spark-streaming-kinesis-asl % 1.2.0 %
provided,

And the error is originating from:

val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())

Am I correct to set spark-streaming-kinesis-asl as a *provided *dependency?
Also, is there a merge strategy I need apply?

Any help would be appreciated, Mike.


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Thanks Vadim, I can certainly consume data from a Kinesis stream when
running locally. I'm currently in the processes of extending my work to a
proper cluster (i.e. using a spark-submit job via uber jar). Feel free to
add me to gmail chat and maybe we can help each other.

On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 I don't believe the Kinesis asl should be provided. I used mergeStrategy
 successfully to produce an uber jar.

 Fyi, I've been having trouble consuming data out of Kinesis with Spark
 with no success :(
 Would be curious to know if you got it working.

 Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:

 Hi All,

 I have having trouble building a fat jar file through sbt-assembly.

 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
 strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/services/java.sql.Driver' with strategy
 'filterDistinctLines'
 [warn] Merging 'rootdoc.txt' with strategy 'concat'
 [warn] Strategy 'concat' was applied to a file
 [warn] Strategy 'discard' was applied to 17 files
 [warn] Strategy 'filterDistinctLines' was applied to a file
 [warn] Strategy 'rename' was applied to 4 files

 When submitting the spark application through the command

 sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
 target/scala-2.10/-snapshot.jar

 I end up the the following error,

 Exception in thread main java.lang.NoClassDefFoundError:
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
 at
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
 at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
 at
 com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155)
 at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20)
 at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala)

 The snippet from my build.sbt file is:

 org.apache.spark %% spark-core % 1.2.0 % provided,
 org.apache.spark %% spark-streaming % 1.2.0 % provided,
 com.datastax.spark %% 

Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Vadim Bichutskiy
I don't believe the Kinesis asl should be provided. I used mergeStrategy 
successfully to produce an uber jar.

Fyi, I've been having trouble consuming data out of Kinesis with Spark with no 
success :( 
Would be curious to know if you got it working.

Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:
 
 Hi All,
 
 I have having trouble building a fat jar file through sbt-assembly. 
 
 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging 
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' 
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 
 'filterDistinctLines'
 [warn] Merging 'rootdoc.txt' with strategy 'concat'
 [warn] Strategy 'concat' was applied to a file
 [warn] Strategy 'discard' was applied to 17 files
 [warn] Strategy 'filterDistinctLines' was applied to a file
 [warn] Strategy 'rename' was applied to 4 files
 
 When submitting the spark application through the command
 
 sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName 
 target/scala-2.10/-snapshot.jar
 
 I end up the the following error, 
 
 Exception in thread main java.lang.NoClassDefFoundError: 
 org/joda/time/format/DateTimeFormat
   at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
   at java.lang.Class.newInstance(Class.java:379)
   at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
   at 
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
   at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
   at 
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
   at 
 com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
   at 
 com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
   at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
   at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
   at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202)
   at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175)
   at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155)
   at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20)
   at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala)
 
 The snippet from my build.sbt file is: 
 
 org.apache.spark %% spark-core % 1.2.0 % provided,
 org.apache.spark %% spark-streaming % 1.2.0 % provided,
 com.datastax.spark %% spark-cassandra-connector % 1.2.0-alpha1 
 % provided,
 org.apache.spark %% spark-streaming-kinesis-asl % 1.2.0 % 
 provided,
 
 And the error is originating from:
 
 val kinesisClient = new AmazonKinesisClient(new 
 

How to access postgresql on Spark SQL

2015-04-13 Thread doovsaid
Hi all,
Who know how to access postgresql on Spark SQL? Do I need add the postgresql 
dependency in build.sbt and set class path for it? 
Thanks.
RegardsYi

Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Vadim Bichutskiy
Thanks Mike. I was having trouble on EC2.

 On Apr 13, 2015, at 10:25 PM, Mike Trienis mike.trie...@orcsol.com wrote:
 
 Thanks Vadim, I can certainly consume data from a Kinesis stream when running 
 locally. I'm currently in the processes of extending my work to a proper 
 cluster (i.e. using a spark-submit job via uber jar). Feel free to add me to 
 gmail chat and maybe we can help each other. 
 
 On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:
 I don't believe the Kinesis asl should be provided. I used mergeStrategy 
 successfully to produce an uber jar.
 
 Fyi, I've been having trouble consuming data out of Kinesis with Spark with 
 no success :( 
 Would be curious to know if you got it working.
 
 Vadim
 
 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:
 
 Hi All,
 
 I have having trouble building a fat jar file through sbt-assembly. 
 
 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging 
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with 
 strategy 'discard'
 [warn] Merging 
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' 
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' 
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with 
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with 
 strategy 'discard'
 [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 
 'filterDistinctLines'
 [warn] Merging 'rootdoc.txt' with strategy 'concat'
 [warn] Strategy 'concat' was applied to a file
 [warn] Strategy 'discard' was applied to 17 files
 [warn] Strategy 'filterDistinctLines' was applied to a file
 [warn] Strategy 'rename' was applied to 4 files
 
 When submitting the spark application through the command
 
 sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName 
 target/scala-2.10/-snapshot.jar
 
 I end up the the following error, 
 
 Exception in thread main java.lang.NoClassDefFoundError: 
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
 at 
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
 at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
 at 
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
 at 
 com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
 at 
 com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
 at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
 at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
 at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202)
 at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175)
 at 
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155)
 at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20)
 at 

Re: [GraphX] aggregateMessages with active set

2015-04-13 Thread James
Hello,

Great thanks for your reply. From the code I found that the reason why my
program will scan all the edges is becasue of the EdgeDirection I passed
into is EdgeDirection.Either.

However I still met the problem of Time consuming of each iteration will
not decrease by time. Thus I have two questions:

1. what is the meaning of activeFraction in [1]
2. As my edgeRDD is too large to cache into memory, I used
StorageLevel.MEMORY_AND_DISK_SER as persist level. thus if the program used
aggregateMessagesIndexScan, will the program still have to load all edge
list into the memory?

[1]
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266

Alcaid


2015-04-10 2:47 GMT+08:00 Ankur Dave ankurd...@gmail.com:

 Actually, GraphX doesn't need to scan all the edges, because it
 maintains a clustered index on the source vertex id (that is, it sorts
 the edges by source vertex id and stores the offsets in a hash table).
 If the activeDirection is appropriately set, it can then jump only to
 the clusters with active source vertices.

 See the EdgePartition#index field [1], which stores the offsets, and
 the logic in GraphImpl#aggregateMessagesWithActiveSet [2], which
 decides whether to do a full scan or use the index.

 [1]
 https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala#L60
 [2].
 https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266

 Ankur


 On Thu, Apr 9, 2015 at 3:21 AM, James alcaid1...@gmail.com wrote:
  In aggregateMessagesWithActiveSet, Spark still have to read all edges. It
  means that a fixed time which scale with graph size is unavoidable on a
  pregel-like iteration.
 
  But what if I have to iterate nearly 100 iterations but at the last 50
  iterations there are only  0.1% nodes need to be updated ? The fixed
 time
  make the program finished at a unacceptable time consumption.



How can I add my custom Rule to spark sql?

2015-04-13 Thread Andy Zhao
Hi guys,

I want to add my custom Rules(whatever the rule is) when the sql Logical
Plan is being analysed.
Is there a way to do that in the spark application code?


Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-add-my-custom-Rule-to-spark-sql-tp22485.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Understanding Spark Memory distribution

2015-04-13 Thread Imran Rashid
broadcast variables count towards spark.storage.memoryFraction, so they
use the same pool of memory as cached RDDs.

That being said, I'm really not sure why you are running into problems, it
seems like you have plenty of memory available.  Most likely its got
nothing to do with broadcast variables or caching -- its just whatever
logic you are applying in your transformations that are causing lots of GC
to occur during the computation.  Hard to say without knowing more details.

You could try increasing the timeout for the failed askWithReply by
increasing spark.akka.lookupTimeout (defaults to 30), but that would most
likely be treating a symptom, not the root cause.

On Fri, Mar 27, 2015 at 4:52 PM, Ankur Srivastava 
ankur.srivast...@gmail.com wrote:

 Hi All,

 I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have
 given 26gb of memory with all 8 cores to my executors. I can see that in
 the logs too:

 *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added:
 app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128
 (10.x.y.z:40128) with 8 cores*

 I am not caching any RDD so I have set spark.storage.memoryFraction to
 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB.

 I am now confused with these logs?

 *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager
 10.77.100.196:58407 http://10.77.100.196:58407 with 4.5 GB RAM,
 BlockManagerId(4, 10.x.y.z, 58407)*

 I am broadcasting a large object of 3 gb and after that when I am creating
 an RDD, I see logs which show this 4.5 GB memory getting full and then I
 get OOM.

 How can I make block manager use more memory?

 Is there any other fine tuning I need to do for broadcasting large objects?

 And does broadcast variable use cache memory or rest of the heap?


 Thanks

 Ankur



Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2015-04-13 Thread sachin Singh
Hi Linlin,
have you got the solution for this issue, if yes then what are the thing
need to make correct,because I am also getting same error,when submitting
spark job in cluster mode getting error as under -
2015-04-14 18:16:43 DEBUG Transaction - Transaction rolled back in 0 ms
2015-04-14 18:16:43 ERROR DDLTask -
org.apache.hadoop.hive.ql.metadata.HiveException: Database does not exist:
my_database
at 
org.apache.hadoop.hive.ql.exec.DDLTask.switchDatabase(DDLTask.java:4054)
at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:269)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
at 
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java
...


Please suggest, I have copied hive-site.xml in spark/conf in standalone its
working fine.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-failed-in-yarn-cluster-mode-when-connecting-to-non-default-hive-database-tp11811p22486.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Expected behavior for DataFrame.unionAll

2015-04-13 Thread Justin Yip
Hello,

I am experimenting with DataFrame. I tried to construct two DataFrames with:
1. case class A(a: Int, b: String)
scala adf.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

2. case class B(a: String, c: Int)
scala bdf.printSchema()
root
 |-- a: string (nullable = true)
 |-- c: integer (nullable = false)


Then I unioned the these two DataFrame with the unionAll function, and I
get the following schema. It is kind of a mixture of A and B.

scala val udf = adf.unionAll(bdf)
scala udf.printSchema()
root
 |-- a: string (nullable = false)
 |-- b: string (nullable = true)

The unionAll documentation says it behaves like the SQL UNION ALL function.
However, unioning incompatible types is not well defined for SQL. Is there
any expected behavior for unioning incompatible data frames?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Expected-behavior-for-DataFrame-unionAll-tp22487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: [Spark1.3] UDF registration issue

2015-04-13 Thread Takeshi Yamamuro
Hi,

It's a syntax error in Spark-1.3.
The next release of spark supports the kind of UDF calls in DataFrame.
See a link below.

https://issues.apache.org/jira/browse/SPARK-6379


On Sat, Apr 11, 2015 at 3:30 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Hi, I'm running into some trouble trying to register a UDF:

 scala sqlContext.udf.register(strLen, (s: String) = s.length())
 res22: org.apache.spark.sql.UserDefinedFunction = 
 UserDefinedFunction(function1,IntegerType)

 scala cleanProcessDF.withColumn(dii,strLen(col(di)))
 console:33: error: not found: value strLen
   cleanProcessDF.withColumn(dii,strLen(col(di)))

 ​

 Where cleanProcessDF is a dataframe
 Is my syntax wrong? Or am I missing an import of some sort?




-- 
---
Takeshi Yamamuro


Re: Spark 1.3.0: Running Pi example on YARN fails

2015-04-13 Thread Zhan Zhang
Hi Zork,

From the exception, it is still caused by hdp.version not being propagated 
correctly.  Can you check whether there is any typo?

[root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0–2041

[root@c6402 conf]# more spark-defaults.conf
spark.driver.extraJavaOptions  -Dhdp.version=2.2.0.0–2041
spark.yarn.am.extraJavaOptions  -Dhdp.version=2.2.0.0–2041

This is HDP specific question, and you can move the topic to HDP forum.


Thanks.

Zhan Zhang


On Apr 13, 2015, at 3:00 AM, Zork Sail 
zorks...@gmail.commailto:zorks...@gmail.com wrote:

Hi Zhan,
Alas setting:

-Dhdp.version=2.2.0.0–2041

Does not help. Still get the same error:
15/04/13 09:53:59 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: UNDEFINED
 tracking URL: 
http://foo.bar.site:8088/proxy/application_1427875242006_0037/
 user: test
15/04/13 09:54:00 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:01 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:02 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:03 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: FAILED)
15/04/13 09:54:03 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1427875242006_0037 failed 2 times due 
to AM Container for appattempt_1427875242006_0037_02 exited with  exitCode: 
1
For more detailed output, check application tracking 
page:http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click 
on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1427875242006_0037_02_01
Exit code: 1
Exception message: 
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
 line 27: 
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
 bad substitution

Stack trace: ExitCodeException exitCode=1: 
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
 line 27: 
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
 bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: FAILED
 tracking URL: 

RE: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread java8964
If it is really due to data skew, will the task hanging has much bigger Shuffle 
Write Size in this case?
In this case, the shuffle write size for that task is 0, and the rest IO of 
this task is not much larger than the fast finished tasks, is that normal?
I am also interested in this case, as from statistics on the UI, how it 
indicates the task could have skew data?
Yong 

Date: Mon, 13 Apr 2015 12:58:12 -0400
Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 
are complete
From: jcove...@gmail.com
To: deepuj...@gmail.com
CC: user@spark.apache.org

I can promise you that this is also a problem in the pig world :) not sure why 
it's not a problem for this data set, though... are you sure that the two are 
doing the exact same code?
you should inspect your source data. Make a histogram for each and see what the 
data distribution looks like. If there is a value or bucket with a 
disproportionate set of values you know you have an issue
2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:
You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what 
is catch all ?
That implies is it a good idea to run a filter on each RDD first ? We do not do 
this using Pig on M/R. Is it required in Spark world ?
On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com wrote:
My guess would be data skew. Do you know if there is some item id that is a 
catch all? can it be null? item id 0? lots of data sets have this sort of value 
and it always kills joins
2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:
Code:
val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = 
lstgItem.join(viEvents).map {  case (itemId, (listing, viDetail)) =
val viSummary = new VISummaryviSummary.leafCategoryId = 
listing.getLeafCategId().toIntviSummary.itemSiteId = 
listing.getItemSiteId().toIntviSummary.auctionTypeCode = 
listing.getAuctTypeCode().toIntviSummary.sellerCountryId = 
listing.getSlrCntryId().toIntviSummary.buyerSegment = 0
viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 
0)val sellerId = listing.getSlrId.toLong(sellerId, (viDetail, 
viSummary, itemId))}
Running Tasks:Tasks
  IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch 
TimeDurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / 
RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors
  

  
0
216
0
RUNNING
PROCESS_LOCAL
181 / phxaishdc9dn0474.phx.ebay.com
2015/04/13 06:43:53

  1.7 h




  13 min







 3.0 GB / 56964921
   

 
   
 0.0 B / 0
   

21.2 GB
  
1902.6 MB
  
 
  
2
218
0
SUCCESS
PROCESS_LOCAL
582 / phxaishdc9dn0235.phx.ebay.com
2015/04/13 06:43:53

  15 min




  31 s







 2.2 GB / 1666851
   

 0.1 s
   
 3.0 MB / 2062
   

54.8 GB
  
1924.5 MB
  
 
  
1
217
0
SUCCESS
PROCESS_LOCAL
202 / phxdpehdc9dn2683.stratus.phx.ebay.com
2015/04/13 06:43:53

  19 min




  1.3 min







 2.2 GB / 1687086
   

 75 ms
   
 3.9 MB / 2692
   

33.7 GB
  
1960.4 MB
  
 
  
4
220
0
SUCCESS
PROCESS_LOCAL
218 / phxaishdc9dn0855.phx.ebay.com
2015/04/13 06:43:53

  15 min




  56 s







 2.2 GB / 1675654
   

 40 ms
   
 3.3 MB / 2260
   

26.2 GB
  
1928.4 MB
  



Command:./bin/spark-submit -v --master yarn-cluster --driver-class-path 
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars 
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  

Re: Need some guidance

2015-04-13 Thread Dean Wampler
The problem with using collect is that it will fail for large data sets, as
you'll attempt to copy the entire RDD to the memory of your driver program.
The following works (Scala syntax, but similar to Python):

scala val i1 = input.distinct.groupByKey
scala i1.foreach(println)
(1,CompactBuffer(beta, alpha, foo))
(3,CompactBuffer(foo))
(2,CompactBuffer(alpha, bar))

scala val i2 = i1.map(tup = (tup._1, tup._2.size))
scala i1.foreach(println)
(1,3)
(3,1)
(2,2)

The i2 line passes a function that takes a tuple argument, then
constructs a new output tuple with the first element and the size of the
second (each CompactBuffer). An alternative pattern match syntax would be.

scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

This should work as long as none of the CompactBuffers are too large, which
could happen for extremely large data sets.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark effectively?
 I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco



Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread James King
Any idea what this means, many thanks

==
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4
cores, 6.6 GB RAM
15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home:
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:8081
15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on
port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with master
spark://08:7077
*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*


Manning looking for a co-author for the GraphX in Action book

2015-04-13 Thread Reynold Xin
Hi all,

Manning (the publisher) is looking for a co-author for the GraphX in Action
book. The book currently has one author (Michael Malak), but they are
looking for a co-author to work closely with Michael and improve the
writings and make it more consumable.

Early access page for the book: http://www.manning.com/malak/

Let me know if you are interested in that. Cheers.


RE: Kryo exception : Encountered unregistered class ID: 13994

2015-04-13 Thread mehdisinger
Hello,

Thank you for your answer.

I'm already registering my classes as you're suggesting...

Regards

De : tsingfu [via Apache Spark User List] 
[mailto:ml-node+s1001560n22468...@n3.nabble.com]
Envoyé : lundi 13 avril 2015 03:48
À : Mehdi Singer
Objet : Re: Kryo exception : Encountered unregistered class ID: 13994

Hi,
error message is mentioned:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
13994

So I think this is issue with kryo, You should use 
`kryo.register(classOf[your_class_name])` in your app code.


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22468.html
To unsubscribe from Kryo exception : Encountered unregistered class ID: 13994, 
click 
herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=22437code=bWVoZGkuc2luZ2VyQGxhbXBpcmlzLmJlfDIyNDM3fC0xNDI5MjI3OTAz.
NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22471.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Kryo exception : Encountered unregistered class ID: 13994

2015-04-13 Thread ๏̯͡๏
You need to do few more things or you will eventually run into these issues

val conf = new SparkConf()
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
*  .set(spark.kryoserializer.buffer.mb,
arguments.get(buffersize).get)*
*  .set(spark.kryoserializer.buffer.max.mb,
arguments.get(maxbuffersize).get)*
.registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))

-Deepak

On Mon, Apr 13, 2015 at 1:19 PM, mehdisinger mehdi.sin...@lampiris.be
wrote:

  Hello,



 Thank you for your answer.



 I’m already registering my classes as you’re suggesting…



 Regards



 *De :* tsingfu [via Apache Spark User List] [mailto:ml-node+[hidden email]
 http:///user/SendEmail.jtp?type=nodenode=22471i=0]
 *Envoyé :* lundi 13 avril 2015 03:48
 *À :* Mehdi Singer
 *Objet :* Re: Kryo exception : Encountered unregistered class ID: 13994



 Hi,
 error message is mentioned:
 com.esotericsoftware.kryo.KryoException: Encountered unregistered class
 ID: 13994

 So I think this is issue with kryo, You should use
 `kryo.register(classOf[your_class_name])` in your app code.

  --

 *If you reply to this email, your message will be added to the discussion
 below:*


 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22468.html

 To unsubscribe from Kryo exception : Encountered unregistered class ID:
 13994, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml

 --
 View this message in context: RE: Kryo exception : Encountered
 unregistered class ID: 13994
 http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22471.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




-- 
Deepak


Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-13 Thread Marius Soutier
I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl=seconds

 On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:
 
 Does anybody have an answer for this?
  
 Thanks
 Ningjun
  
 From: Wang, Ningjun (LNG-NPV) 
 Sent: Thursday, April 02, 2015 12:14 PM
 To: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
  
 I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are shuffled, 
 spark writes to this folder. I found that the disk space of this folder keep 
 on increase quickly and at certain point I will run out of disk space. 
  
 I wonder does spark clean up the disk space in this folder once the shuffle 
 operation is done? If not, I need to write a job to clean it up myself. But 
 how do I know which sub folders there can be removed?
  
 Ningjun



Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Guillaume Pitel

Very likely to be this :
http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

Your worker ran out of memory = maybe you're asking for too much memory 
for the JVM, or something else is running on the worker


Guillaume

Any idea what this means, many thanks

== 
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 
==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 
4 cores, 6.6 GB RAM

15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home: 
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4

15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:8081 
http://SelectChannelConnector@0.0.0.0:8081
15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' 
on port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at 
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with master 
spark://08:7077

*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: regarding ZipWithIndex

2015-04-13 Thread Jeetendra Gangele
How about using mapToPair and exchanging the two. Will it be efficient
Below is the code , will it be efficient to convert like this.


JavaPairRDDLong, MatcherReleventData RddForMarch
=matchRdd.zipWithindex.mapToPair(new
PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() {

@Override
public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t)
throws Exception {
MatcherReleventData matcherData = new MatcherReleventData();
Tuple2Long, MatcherReleventData tuple = new Tuple2Long,
MatcherReleventData(t._2,
matcherData.convertVendorDataToMatcherData(t._1));
 return tuple;
}

}).cache();

On 13 April 2015 at 03:11, Ted Yu yuzhih...@gmail.com wrote:

 Please also take a look at ZippedWithIndexRDDPartition which is 72 lines
 long.

 You can create your own version which extends RDD[(Long, T)]

 Cheers

 On Sun, Apr 12, 2015 at 1:29 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. will return something like JavaPairRDDObject, long

 The long component of the pair fits your description of index. What other
 requirement does ZipWithIndex not provide you ?

 Cheers

 On Sun, Apr 12, 2015 at 1:16 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Hi All I have an RDD JavaRDDObject and I want to convert it to
 JavaPairRDDIndex,Object.. Index should be unique and it should maintain
 the order. For first object It should have 1 and then for second 2 like
 that.

 I tried using ZipWithIndex but it will return something like
 JavaPairRDDObject, long
 I wanted to use this RDD for lookup and join operation later in my
 workflow so ordering is important.


 Regards
 jeet






Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-13 Thread Guillaume Pitel
Does it also cleanup spark local dirs ? I thought it was only cleaning 
$SPARK_HOME/work/


Guillaume

I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl=seconds


On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote:


Does anybody have an answer for this?
Thanks
Ningjun
*From:*Wang, Ningjun (LNG-NPV)
*Sent:*Thursday, April 02, 2015 12:14 PM
*To:*user@spark.apache.org mailto:user@spark.apache.org
*Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up?
I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are 
shuffled, spark writes to this folder. I found that the disk space of 
this folder keep on increase quickly and at certain point I will run 
out of disk space.
I wonder does spark clean up the disk spacein this folder once the 
shuffle operation is done? If not, I need to write a job to clean it 
up myself. But how do I know which sub folders there can be removed?

Ningjun





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



ExceptionDriver-Memory while running Spark job on Yarn-cluster

2015-04-13 Thread sachin Singh
Hi ,
When I am submitting spark job as --master yarn-cluster with below
command/options getting driver 
memory error-

spark-submit --jars
./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files
datasource.properties,log4j.properties --master yarn-cluster --num-executors
1 --driver-memory 2g --executor-memory 512m --class
com.test.spark.jobs.AggregationJob sparkagg.jar 

Exceptions as per yarn application ID log as under -
Container: container_1428938273236_0006_01_01 on 
mycom.hostname.com_8041
=
LogType: stderr
LogLength: 128
Log Contents:
Exception in thread Driver
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread Driver

LogType: stdout
LogLength: 40


Container: container_1428938273236_0006_02_01 on mycom.hostname.com_8041
=
LogType: stderr
LogLength: 1365
Log Contents:
java.io.IOException: Log directory
hdfs://mycom.hostname.com:8020/user/spark/applicationHistory/application_1428938273236_0006
already exists!
at
org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129)
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
at org.apache.spark.SparkContext.init(SparkContext.scala:353)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)
  
LogType: stdout
LogLength: 40


please help its urgent for me,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Driver-Memory-while-running-Spark-job-on-Yarn-cluster-tp22475.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib : Gradient Boosted Trees classification confidence

2015-04-13 Thread mike
Thank you Peter.

I just want to be sure.
even if I use the classification setting the GBT uses regression trees
and not classification trees?

I know the difference between the two(theoretically) is only in the loss
and impurity functions.
thus in case it uses classification trees doing what you proposed will
result in the classification it self.

Also by looking in the scala API
I found that each Node holds a Predict object which contains probability
of the label (classification only) (
https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.mllib.tree.model.Predict
)
** This what i called confidence


So to sum-up my questions and confusion:
1. Does GBT uses classification trees when setting it to classification or
it always uses regression trees ?
2. In case it uses classification trees , How could i efficiently get to
the confidence = Node. Predict.prob ?

Thanks again'
Michael



On Mon, Apr 13, 2015 at 10:13 AM, pprett [via Apache Spark User List] 
ml-node+s1001560n22470...@n3.nabble.com wrote:

 Hi Mike,

 Gradient Boosted Trees (or gradient boosted regression trees) dont store
 probabilities in each leaf node but rather model a continuous function
 which is then transformed via a logistic sigmoid (ie. like in a Logistic
 Regression model).
 If you are just interested in a confidence, you can use this continuous
 function directly: its just the (weighted) sum of the predictions of the
 individual regression trees. Use the absolute value for confidence and the
 sign to determine which class label.
 Here is an example:

 def score(features: Vector): Double = {
 val treePredictions = gbdt.trees.map(_.predict(features))
 blas.ddot(gbdt.numTrees, treePredictions, 1, gbdt.treeWeights, 1)
 }

 If you are rather interested in probabilities, just pass the function
 value to a logistic sigmoid.

 best,
  Peter

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22470.html
  To unsubscribe from MLlib : Gradient Boosted Trees classification
 confidence, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=22466code=bWljaGFlbGtyYXNAZ21haWwuY29tfDIyNDY2fDQxMDYzODQ0Mw==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark 1.3.0: Running Pi example on YARN fails

2015-04-13 Thread Zork Sail
Hi Zhan,
Alas setting:

-Dhdp.version=2.2.0.0–2041

Does not help. Still get the same error:
15/04/13 09:53:59 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: UNDEFINED
 tracking URL:
http://foo.bar.site:8088/proxy/application_1427875242006_0037/
 user: test
15/04/13 09:54:00 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:01 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:02 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:03 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: FAILED)
15/04/13 09:54:03 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1427875242006_0037 failed 2 times
due to AM Container for appattempt_1427875242006_0037_02 exited with
exitCode: 1
For more detailed output, check application tracking page:
http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click
on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1427875242006_0037_02_01
Exit code: 1
Exception message:
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
line 27:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
bad substitution

Stack trace: ExitCodeException exitCode=1:
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
line 27:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: FAILED
 tracking URL:
http://foo.bar.site:8088/cluster/app/application_1427875242006_0037
 user: test
Exception in thread main org.apache.spark.SparkException: Application
finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at

Reading files from http server

2015-04-13 Thread Peter Rudenko
Hi, i want to play with Criteo 1 tb dataset. Files are located on azure 
storage. Here's a command to download them:
curl -O 
http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{`seq 
-s ‘,’ 0 23`}.gz
is there any way to read files through http protocol with spark without 
downloading them first to hdfs?. Something like this:
sc.textFile( 
http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{0-23}.gz;), 
so it will have 24 partitions.


Thanks,
Peter Rudenko



How to use multiple app jar files?

2015-04-13 Thread Michael Weir
My app works fine with the single, uber jar file containing my app and
all its dependencies. However, it takes about 20 minutes to copy the 65MB
jar file up to the node on the cluster, so my code, compile, test cycle
has become a core, compile, cooppp, test cycle.

I'd like to have a single dependencies jar file on the node, and use a
separate small jar for my app (which takes around 10 seconds to copy to the
node).

I've tried using --jars deps.jar, but that copies the deps.jar to the
app-* folder but not the driver-* folder, so I get classNotFound errors on
the driver. Various other combinations of flags, etc. have produced a fair
bit of frustration but no progress.

Any help with this would be greatly appreciated, as this problem is
significantly stretching the length of my work day!

Thanks.


Re: Parquet File Binary column statistics error when reuse byte[] among rows

2015-04-13 Thread Cheng Lian

Thanks Yijie! Also cc the user list.

Cheng

On 4/13/15 9:19 AM, Yijie Shen wrote:
I opened a new Parquet JIRA ticket here: 
https://issues.apache.org/jira/browse/PARQUET-251


Yijie

On April 12, 2015 at 11:48:57 PM, Cheng Lian (lian.cs@gmail.com 
mailto:lian.cs@gmail.com) wrote:



Thanks for reporting this! Would you mind to open JIRA tickets for both
Spark and Parquet?

I'm not sure whether Parquet declares somewhere the user mustn't reuse
byte arrays when using binary type. If it does, then it's a Spark bug.
Anyway, this should be fixed.

Cheng

On 4/12/15 1:50 PM, Yijie Shen wrote:
 Hi,

 Suppose I create a dataRDD which extends RDD[Row], and each row is
 GenericMutableRow(Array(Int, Array[Byte])). A same Array[Byte] 
object is
 reused among rows but has different content each time. When I 
convert it to
 a dataFrame and save it as Parquet File, the file's row group 
statistic(max

  min) of Binary column would be wrong.



 Here is the reason: In Parquet, BinaryStatistic just keep max  min as
 parquet.io.api.Binary references, Spark sql would generate a new 
Binary

 backed by the same Array[Byte] passed from row.
 reference backed max: 
Binary--ByteArrayBackedBinary--

 Array[Byte]

 Therefore, each time parquet updating row group's statistic, max  min
 would always refer to the same Array[Byte], which has new content each
 time. When parquet decides to save it into file, the last row's 
content

 would be saved as both max  min.



 It seems it is a parquet bug because it's parquet's responsibility to
 update statistics correctly.
 But not quite sure. Should I report it as a bug in parquet JIRA?


 The spark JIRA is https://issues.apache.org/jira/browse/SPARK-6859






Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Everything works smoothly if I do the 99%-removal filter in Hive first.
 So, all the baggage from garbage collection was breaking it.

 Is there a way to filter() out 99% of the data without having to garbage
 collect 99% of the RDD?

 On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:

 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)

 Here is a typical GC log trace from one of the yarn container logs:

 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]
 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]
 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]
 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]
 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]
 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]

 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?

 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):

 --num-executors 150 \
 --driver-memory 15g \
 --executor-memory 110g \
 --executor-cores 32 \

 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space




 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation
 (repartition in this case) that it is performing. Only if this information
 is wrong (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using
 reduceByKey (or simply countByKey) which is relatively inexpensive. For the
 purposes of this algorithm I can simply log and remove keys with huge
 counts, before doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes
 timeout trying to connect because of a long stop-the-world), quite possibly
 due to groupByKey. groupByKey is a very expensive operation as it may bring
 all the data for a particular partition into memory (in particular, it
 cannot spill values for a single key, so if you have a single very skewed
 key you can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend:
 Driver Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task
 421027.0 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 

Re: How to use multiple app jar files?

2015-04-13 Thread ๏̯͡๏
I faced exact same issue. The way i solved it was

1. Copy entire project.
2. Delete all the source, have only the dependencies in pom.xml. This will
create, fat jar, without source but deps only.
3. In original project keep it as is, now build it. this will create a JAR
(no deps, by default)

Now add your deps jar to spark, with --jar

This requires two projects, but thats better than earlier.

On Mon, Apr 13, 2015 at 4:45 PM, Michael Weir michael.weir@gmail.com
wrote:

 My app works fine with the single, uber jar file containing my app and
 all its dependencies. However, it takes about 20 minutes to copy the 65MB
 jar file up to the node on the cluster, so my code, compile, test cycle
 has become a core, compile, cooppp, test cycle.

 I'd like to have a single dependencies jar file on the node, and use a
 separate small jar for my app (which takes around 10 seconds to copy to the
 node).

 I've tried using --jars deps.jar, but that copies the deps.jar to the
 app-* folder but not the driver-* folder, so I get classNotFound errors on
 the driver. Various other combinations of flags, etc. have produced a fair
 bit of frustration but no progress.

 Any help with this would be greatly appreciated, as this problem is
 significantly stretching the length of my work day!

 Thanks.




-- 
Deepak


Re: Packaging Java + Python library

2015-04-13 Thread prabeesh k
Refer this post
http://blog.prabeeshk.com/blog/2015/04/07/self-contained-pyspark-application/

On 13 April 2015 at 17:41, Punya Biswal pbis...@palantir.com wrote:

 Dear Spark users,

 My team is working on a small library that builds on PySpark and is
 organized like PySpark as well -- it has a JVM component (that runs in the
 Spark driver and executor) and a Python component (that runs in the PySpark
 driver and executor processes). What's a good approach for packaging such a
 library?

 Some ideas we've considered:

- Package up the JVM component as a Jar and the Python component as a
binary egg. This is reasonable but it means that there are two separate
artifacts that people have to manage and keep in sync.
- Include Python files in the Jar and add it to the PYTHONPATH. This
follows the example of the Spark assembly jar, but deviates from the Python
community's standards.

 We'd really appreciate hearing experiences from other people who have
 built libraries on top of PySpark.

 Punya



Re: Spark TeraSort source request

2015-04-13 Thread Tom Hubregtsen
Thank you for your response Ewan. I quickly looked yesterday and it was
there, but today at work I tried to open it again to start working on it,
but it appears to be removed. Is this correct?

Thanks,

Tom

On 12 April 2015 at 06:58, Ewan Higgs ewan.hi...@ugent.be wrote:

  Hi all.
 The code is linked from my repo:

 https://github.com/ehiggs/spark-terasort
 
 This is an example Spark program for running TeraSort benchmarks. It is
 based on work from Reynold Xin's branch
 https://github.com/rxin/spark/tree/terasort, but it is not the same
 TeraSort program that currently holds the record
 http://sortbenchmark.org/. That program is here
 https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort
 .
 

 That program is here links to:

 https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort

 I've been working on other projects at the moment so I haven't returned to
 the spark-terasort stuff. If you have any pull requests, I would be very
 grateful.

 Yours,
 Ewan


 On 08/04/15 03:26, Pramod Biligiri wrote:

 +1. I would love to have the code for this as well.

  Pramod

 On Fri, Apr 3, 2015 at 12:47 PM, Tom thubregt...@gmail.com wrote:

 Hi all,

 As we all know, Spark has set the record for sorting data, as published
 on:
 https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html.

 Here at our group, we would love to verify these results, and compare
 machine using this benchmark. We've spend quite some time trying to find
 the
 terasort source code that was used, but can not find it anywhere.

 We did find two candidates:

 A version posted by Reynold [1], the posted of the message above. This
 version is stuck at // TODO: Add partition-local (external) sorting
 using TeraSortRecordOrdering, only generating data.

 Here, Ewan noticed that it didn't appear to be similar to Hadoop
 TeraSort.
 [2] After this he created a version on his own [3]. With this version, we
 noticed problems with TeraValidate with datasets above ~10G (as mentioned
 by
 others at [4]. When examining the raw input and output files, it actually
 appears that the input data is sorted and the output data unsorted in both
 cases.

 Because of this, we believe we did not yet find the actual used source
 code.
 I've tried to search in the Spark User forum archive's, seeing request of
 people, indicating a demand, but did not succeed in finding the actual
 source code.

 My question:
 Could you guys please make the source code of the used TeraSort program,
 preferably with settings, available? If not, what are the reasons that
 this
 seems to be withheld?

 Thanks for any help,

 Tom Hubregtsen

 [1]

 https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87
 [2]

 http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E
 [3] https://github.com/ehiggs/spark-terasort
 [4]

 http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Packaging Java + Python library

2015-04-13 Thread Punya Biswal
Dear Spark users,

My team is working on a small library that builds on PySpark and is organized 
like PySpark as well -- it has a JVM component (that runs in the Spark driver 
and executor) and a Python component (that runs in the PySpark driver and 
executor processes). What's a good approach for packaging such a library?

Some ideas we've considered:
Package up the JVM component as a Jar and the Python component as a binary egg. 
This is reasonable but it means that there are two separate artifacts that 
people have to manage and keep in sync.
Include Python files in the Jar and add it to the PYTHONPATH. This follows the 
example of the Spark assembly jar, but deviates from the Python community's 
standards.
We'd really appreciate hearing experiences from other people who have built 
libraries on top of PySpark.

Punya



smime.p7s
Description: S/MIME cryptographic signature


Sqoop parquet file not working in spark

2015-04-13 Thread bipin
Hi I imported a table from mssql server with Sqoop 1.4.5 in parquet format.
But when I try to load it from Spark shell, it throws error like :

scala val df1 = sqlContext.load(/home/bipin/Customer2)
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
during a parallel computation: java.lang.NullPointerException
parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543)
parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520)
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:298)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297)
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
.
.
.
at scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)
at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
at
scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
at
scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I looked at the sqoop parquet folder and it's structure is different than
the one that I created on Spark. How can I make the parquet file work ? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sqoop-parquet-file-not-working-in-spark-tp22477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ExceptionDriver-Memory while running Spark job on Yarn-cluster

2015-04-13 Thread ๏̯͡๏
Try this

./bin/spark-submit -v --master yarn-cluster --jars
./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files
datasource.properties,log4j.properties  --num-executors 1 --driver-memory
4g *--driver-java-options -XX:MaxPermSize=1G* --executor-memory 2g
--executor-cores 1 --class com.test.spark.jobs.AggregationJob sparkagg.jar


I noticed that your using mysql-connector-java-5.1.17.jar. Are you running
Spark-SQL (Hive queries from Spark) ?


On Mon, Apr 13, 2015 at 3:53 PM, sachin Singh sachin.sha...@gmail.com
wrote:

 Hi ,
 When I am submitting spark job as --master yarn-cluster with below
 command/options getting driver
 memory error-

 spark-submit --jars
 ./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files
 datasource.properties,log4j.properties --master yarn-cluster
 --num-executors
 1 --driver-memory 2g --executor-memory 512m --class
 com.test.spark.jobs.AggregationJob sparkagg.jar

 Exceptions as per yarn application ID log as under -
 Container: container_1428938273236_0006_01_01 on
 mycom.hostname.com_8041

 =
 LogType: stderr
 LogLength: 128
 Log Contents:
 Exception in thread Driver
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread Driver

 LogType: stdout
 LogLength: 40


 Container: container_1428938273236_0006_02_01 on
 mycom.hostname.com_8041

 =
 LogType: stderr
 LogLength: 1365
 Log Contents:
 java.io.IOException: Log directory
 hdfs://
 mycom.hostname.com:8020/user/spark/applicationHistory/application_1428938273236_0006
 already exists!
 at
 org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129)
 at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
 at

 org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
 at org.apache.spark.SparkContext.init(SparkContext.scala:353)
 at

 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)

 LogType: stdout
 LogLength: 40


 please help its urgent for me,




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Driver-Memory-while-running-Spark-job-on-Yarn-cluster-tp22475.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Deepak


Need some guidance

2015-04-13 Thread Marco Shaw
**Learning the ropes**

I'm trying to grasp the concept of using the pipeline in pySpark...

Simplified example:

list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

Desired outcome:
[(1,3),(2,2),(3,1)]

Basically for each key, I want the number of unique values.

I've tried different approaches, but am I really using Spark effectively?
I wondered if I would do something like:
 input=sc.parallelize(list)
 input.groupByKey().collect()

Then I wondered if I could do something like a foreach over each key value,
and then map the actual values and reduce them.  Pseudo-code:

input.groupbykey()
.keys
.foreach(_.values
.map(lambda x: x,1)
.reducebykey(lambda a,b:a+b)
.count()
)

I was somehow hoping that the key would get the current value of count, and
thus be the count of the unique keys, which is exactly what I think I'm
looking for.

Am I way off base on how I could accomplish this?

Marco


Multipart upload to S3 fails with Bad Digest Exceptions

2015-04-13 Thread Eugen Cepoi
Hi,

I am not sure my problem is relevant to spark, but perhaps someone else had
the same error. When I try to write files that need multipart upload to S3
from a job on EMR I always get this error:

com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you
specified did not match what we received.

If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or
output smaller files that don't require multi part upload), then everything
works fine.

I've seen an old thread on the ML where someone has the same error, but in
my case I don't have any other errors on the worker nodes.

I am using spark 1.2.1 and hadoop 2.4.0.

Thanks,
Eugen


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread ๏̯͡๏
You mean there is a tuple in either RDD, that has itemID = 0 or null ?
And what is catch all ?

That implies is it a good idea to run a filter on each RDD first ? We do
not do this using Pig on M/R. Is it required in Spark world ?

On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com
wrote:

 My guess would be data skew. Do you know if there is some item id that is
 a catch all? can it be null? item id 0? lots of data sets have this sort of
 value and it always kills joins

 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = listing.getLeafCategId().toInt
 viSummary.itemSiteId = listing.getItemSiteId().toInt
 viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
 viSummary.sellerCountryId = listing.getSlrCntryId().toInt
 viSummary.buyerSegment = 0
 viSummary.isBin = (if
 (listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)
 val sellerId = listing.getSlrId.toLong
 (sellerId, (viDetail, viSummary, itemId))
 }

 Running Tasks:
 Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
 RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
 PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
 MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
 phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
 min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
 MB



 Command:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  --num-executors 3000 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue
 hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
 /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
 startDate=2015-04-6 endDate=2015-04-7
 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
 output=/user/dvasthimal/epdatasets/viewItem buffersize=128
 maxbuffersize=1068 maxResultSize=2G


 What do i do ? I killed the job twice and its stuck again. Where is it
 stuck ?

 --
 Deepak





-- 
Deepak


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
I can promise you that this is also a problem in the pig world :) not sure
why it's not a problem for this data set, though... are you sure that the
two are doing the exact same code?

you should inspect your source data. Make a histogram for each and see what
the data distribution looks like. If there is a value or bucket with a
disproportionate set of values you know you have an issue

2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 You mean there is a tuple in either RDD, that has itemID = 0 or null ?
 And what is catch all ?

 That implies is it a good idea to run a filter on each RDD first ? We do
 not do this using Pig on M/R. Is it required in Spark world ?

 On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 My guess would be data skew. Do you know if there is some item id that is
 a catch all? can it be null? item id 0? lots of data sets have this sort of
 value and it always kills joins

 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = listing.getLeafCategId().toInt
 viSummary.itemSiteId = listing.getItemSiteId().toInt
 viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
 viSummary.sellerCountryId = listing.getSlrCntryId().toInt
 viSummary.buyerSegment = 0
 viSummary.isBin = (if
 (listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)
 val sellerId = listing.getSlrId.toLong
 (sellerId, (viDetail, viSummary, itemId))
 }

 Running Tasks:
 Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size
 / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0
 RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13
 06:43:53 1.7 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB
 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
 MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
 phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
 min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
 MB



 Command:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  --num-executors 3000 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue
 hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
 /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
 startDate=2015-04-6 endDate=2015-04-7
 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
 output=/user/dvasthimal/epdatasets/viewItem buffersize=128
 maxbuffersize=1068 maxResultSize=2G


 What do i do ? I killed the job twice and its stuck again. Where is it
 stuck ?

 --
 Deepak





 --
 Deepak




feature scaling in GeneralizedLinearAlgorithm.scala

2015-04-13 Thread Jianguo Li
Hi,

In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it
says if userFeatureScaling is enabled, we will standardize the training
features , and trained the model in the scaled space. Then we transform
the coefficients from the scaled space to the original space 

My understanding then is we do not need to scale the test data since the
coefficients are already in the original space, is this correct?

Thanks

Jianguo


Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Tim Chen
Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap
memory limits differently and throws OutOfMemoryError and eventually sends
SIGINT.

Not sure what happened but the worker simply received a SIGTERM signal, so
perhaps the daemon was terminated by someone or a parent process. Just my
guess.

Tim

On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel guillaume.pi...@exensa.com
 wrote:

  Very likely to be this :

 http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

 Your worker ran out of memory = maybe you're asking for too much memory
 for the JVM, or something else is running on the worker

 Guillaume

  Any idea what this means, many thanks

  ==
 logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
 ==
 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4
 cores, 6.6 GB RAM
 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
 15/04/13 07:07:22 INFO Worker: Spark home:
 /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
 15/04/13 07:07:22 INFO AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:8081
 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on
 port 8081.
 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
 http://09:8081
 15/04/13 07:07:22 INFO Worker: Connecting to master
 akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
 15/04/13 07:07:22 INFO Worker: Successfully registered with master
 spark://08:7077
 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*



 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Spark Streaming Kafka Consumer, Confluent Platform, Avro StorageLevel

2015-04-13 Thread Nicolas Phung
Hello,

I'm trying to use a Spark Streaming (1.2.0-cdh5.3.2) consumer
(via spark-streaming-kafka lib of the same version) with Kafka's Confluent
Platform 1.0.

I manage to make a Producer that produce my data and can check it via the
avro-console-consumer :

./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181
--from-beginning

which displays :

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{group:,domainType:job,action:apply_freely,entity:14564564132,user:{string:user},session:{string:session},date:20150326T154052.000+0100,ip:192.168.0.1,userAgent:Mozilla/5.0
(X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/41.0.2272.101 Safari/537.36,referer:
http://www.kb.com/offre/controleur-de-gestion-h-f-9775029
,os:Linux,deviceType:Computer,browser:Chrome,browserVersion:41.0.2272.101,browserRenderer:WEBKIT,physicalServerOrigin:01.front.local}
{group:,domainType:job,action:apply_freely,entity:14564564132,user:{string:user},session:{string:session},date:20150326T154052.000+0100,ip:192.168.0.1,userAgent:Mozilla/5.0
(X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/41.0.2272.101 Safari/537.36,referer:
http://www.kb.com/offre/controleur-de-gestion-h-f-9775029
,os:Linux,deviceType:Computer,browser:Chrome,browserVersion:41.0.2272.101,browserRenderer:WEBKIT,physicalServerOrigin:01.front.local}

So far, so good ! Now here's the weird thing, When I'm using the following
the  spark-streaming-kafka:

val kafkaParams = Map[String, String](metadata.broker.list -
brokers, group.id - consumer, zookeeper.connect - zookeeper,
auto.offset.reset - smallest, schema.registry.url -
schemaRegistryUrl)
val topicMap = topics.split(,).map((_, 2)).toMap
val messages = KafkaUtils.createStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER)

val events = messages.map(_._2)
val dStream: DStream[Event] = events.map(
  avroMessage = {
println(Avro content:  + avroMessage)
  }
)
dStream.print()


Spark Streaming micro batch show me this :


15/04/10 16:11:40 INFO storage.BlockManager: Found block
input-0-1428675099000 locally
Avro content: {group: , domainType: , action:
, entity: , user: , session: , date:
, ip: , userAgent: , referer: , os:
, deviceType: , browser: , browserVersion:
, browserRenderer: , physicalServerOrigin: }
Avro content: {group: , domainType: , action:
, entity: , user: , session: , date:
, ip: , userAgent: , referer: , os:
, deviceType: , browser: , browserVersion:
, browserRenderer: , physicalServerOrigin: }


When I'm changing the snippet code to this (change the StorageLevel to a
non *_SER one) like this :

val messages = KafkaUtils.createStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap,
*StorageLevel.MEMORY_AND_DISK*)


And it works as expected:


15/04/10 16:29:02 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2)
15/04/10 16:29:02 INFO storage.BlockManager: Found block
input-0-1428676140400 locally
Avro content: {group: , domainType: job, action:
apply_freely, entity: 14564564132, user: user, session:
session, date: 20150326T154052.000+0100, ip: 192.168.0.1,
userAgent: Mozilla\/5.0 (X11; Linux x86_64) AppleWebKit\/537.36
(KHTML, like Gecko) Chrome\/41.0.2272.101 Safari\/537.36, referer:
http:\/\/www.kb.com\/offre\/controleur-de-gestion-h-f-9775029, os:
Linux, deviceType: Computer, browser: Chrome,
browserVersion: 41.0.2272.101, browserRenderer: WEBKIT,
physicalServerOrigin: 01.front.local}
Avro content: {group: , domainType: job, action:
apply_freely, entity: 14564564132, user: user, session:
session, date: 20150326T154052.000+0100, ip: 192.168.0.1,
userAgent: Mozilla\/5.0 (X11; Linux x86_64) AppleWebKit\/537.36
(KHTML, like Gecko) Chrome\/41.0.2272.101 Safari\/537.36, referer:
http:\/\/www.kb.com\/offre\/controleur-de-gestion-h-f-9775029, os:
Linux, deviceType: Computer, browser: Chrome,
browserVersion: 41.0.2272.101, browserRenderer: WEBKIT,
physicalServerOrigin: 01.front.local}


If Kryo is disabled, the Java serialization seems to be able to read the
record as expected in both _SER / NON _SER StorageLevel. I've tried several
things with Kryo serialization with no good results (even just activate
Kryo without registering any class manually). I don't know maybe I'm doing
something wrong with the StorageLevel in Spark ?

People in Confluent Platform mailing list

Help in transforming the RDD

2015-04-13 Thread Jeetendra Gangele
Hi All I have an JavaPairRDDLong,String where each long key have 4
 string values associated with it. I want to fire the Hbase query for look
up of the  each String part of RDD.
This look-up will give result of around 7K integers.so for each key I will
have 7k values. now my  input RDD always already more than GB and after
getting these result it will become around 50 GB which  I want avoid .

My problem. 1, Test1
1,test2
 1.test3
 1, test4
 ...
 .
Now I will query Hbase for Test1, test2 test3 ,test4 in parallel ech query
will give result around 2K so total 8k of integers.

Now for each record I will have 1*8000 entries in my RDD and suppose I have
1 million record it will become 1 million*8000 will is huge to process even
using GroupBy.


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
My guess would be data skew. Do you know if there is some item id that is a
catch all? can it be null? item id 0? lots of data sets have this sort of
value and it always kills joins

2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = listing.getLeafCategId().toInt
 viSummary.itemSiteId = listing.getItemSiteId().toInt
 viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
 viSummary.sellerCountryId = listing.getSlrCntryId().toInt
 viSummary.buyerSegment = 0
 viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
  0) 1 else 0)
 val sellerId = listing.getSlrId.toLong
 (sellerId, (viDetail, viSummary, itemId))
 }

 Running Tasks:
 Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
 RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
 PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
 MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
 phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3 min
 2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0 SUCCESS
 PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15
 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4 MB



 Command:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  --num-executors 3000 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue
 hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
 /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
 startDate=2015-04-6 endDate=2015-04-7
 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
 output=/user/dvasthimal/epdatasets/viewItem buffersize=128
 maxbuffersize=1068 maxResultSize=2G


 What do i do ? I killed the job twice and its stuck again. Where is it
 stuck ?

 --
 Deepak




Re: Could not compute split, block not found in Spark Streaming Simple Application

2015-04-13 Thread Saiph Kappa
Whether I use 1 or 2 machines, the results are the same... Here follows the
results I got using 1 and 2 receivers with 2 machines:

2 machines, 1 receiver:

sbt/sbt run-main Benchmark 1 machine1  1000 21 | grep -i Total
delay\|record

15/04/13 16:41:34 INFO JobScheduler: Total delay: 0.156 s for time
1428939694000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:41:35 INFO JobScheduler: Total delay: 0.155 s for time
1428939695000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:41:36 INFO JobScheduler: Total delay: 0.132 s for time
1428939696000 ms (execution: 0.119 s)
Received 92910 records
15/04/13 16:41:37 INFO JobScheduler: Total delay: 0.172 s for time
1428939697000 ms (execution: 0.161 s)
Received 92910 records
15/04/13 16:41:38 INFO JobScheduler: Total delay: 0.152 s for time
1428939698000 ms (execution: 0.140 s)
Received 92910 records
15/04/13 16:41:39 INFO JobScheduler: Total delay: 0.162 s for time
1428939699000 ms (execution: 0.149 s)
Received 92910 records
15/04/13 16:41:40 INFO JobScheduler: Total delay: 0.156 s for time
142893970 ms (execution: 0.143 s)
Received 92910 records
15/04/13 16:41:41 INFO JobScheduler: Total delay: 0.148 s for time
1428939701000 ms (execution: 0.135 s)
Received 92910 records
15/04/13 16:41:42 INFO JobScheduler: Total delay: 0.149 s for time
1428939702000 ms (execution: 0.135 s)
Received 92910 records
15/04/13 16:41:43 INFO JobScheduler: Total delay: 0.153 s for time
1428939703000 ms (execution: 0.136 s)
Received 92910 records
15/04/13 16:41:44 INFO JobScheduler: Total delay: 0.118 s for time
1428939704000 ms (execution: 0.111 s)
Received 92910 records
15/04/13 16:41:45 INFO JobScheduler: Total delay: 0.155 s for time
1428939705000 ms (execution: 0.143 s)
Received 92910 records
15/04/13 16:41:46 INFO JobScheduler: Total delay: 0.138 s for time
1428939706000 ms (execution: 0.126 s)
Received 92910 records
15/04/13 16:41:47 INFO JobScheduler: Total delay: 0.154 s for time
1428939707000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:41:48 INFO JobScheduler: Total delay: 0.172 s for time
1428939708000 ms (execution: 0.160 s)
Received 92910 records
15/04/13 16:41:49 INFO JobScheduler: Total delay: 0.144 s for time
1428939709000 ms (execution: 0.133 s)


Receiver Statistics

   - Receiver


   - Status


   - Location


   - Records in last batch
   - [2015/04/13 16:53:54]


   - Minimum rate
   - [records/sec]


   - Median rate
   - [records/sec]


   - Maximum rate
   - [records/sec]


   - Last Error

Receiver-0---10-10-100-
2 machines, 2 receivers:

sbt/sbt run-main Benchmark 2 machine1  1000 21 | grep -i Total
delay\|record

Received 92910 records
15/04/13 16:43:13 INFO JobScheduler: Total delay: 0.153 s for time
1428939793000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:43:14 INFO JobScheduler: Total delay: 0.144 s for time
1428939794000 ms (execution: 0.136 s)
Received 92910 records
15/04/13 16:43:15 INFO JobScheduler: Total delay: 0.145 s for time
1428939795000 ms (execution: 0.132 s)
Received 92910 records
15/04/13 16:43:16 INFO JobScheduler: Total delay: 0.144 s for time
1428939796000 ms (execution: 0.134 s)
Received 92910 records
15/04/13 16:43:17 INFO JobScheduler: Total delay: 0.148 s for time
1428939797000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:43:18 INFO JobScheduler: Total delay: 0.136 s for time
1428939798000 ms (execution: 0.123 s)
Received 92910 records
15/04/13 16:43:19 INFO JobScheduler: Total delay: 0.155 s for time
1428939799000 ms (execution: 0.145 s)
Received 92910 records
15/04/13 16:43:20 INFO JobScheduler: Total delay: 0.160 s for time
142893980 ms (execution: 0.152 s)
Received 83619 records
15/04/13 16:43:21 INFO JobScheduler: Total delay: 0.141 s for time
1428939801000 ms (execution: 0.131 s)
Received 102201 records
15/04/13 16:43:22 INFO JobScheduler: Total delay: 0.208 s for time
1428939802000 ms (execution: 0.197 s)
Received 83619 records
15/04/13 16:43:23 INFO JobScheduler: Total delay: 0.160 s for time
1428939803000 ms (execution: 0.147 s)
Received 92910 records
15/04/13 16:43:24 INFO JobScheduler: Total delay: 0.197 s for time
1428939804000 ms (execution: 0.185 s)
Received 92910 records
15/04/13 16:43:25 INFO JobScheduler: Total delay: 0.200 s for time
1428939805000 ms (execution: 0.189 s)
Received 92910 records
15/04/13 16:43:26 INFO JobScheduler: Total delay: 0.181 s for time
1428939806000 ms (execution: 0.173 s)
Received 92910 records
15/04/13 16:43:27 INFO JobScheduler: Total delay: 0.189 s for time
1428939807000 ms (execution: 0.178 s)

Receiver Statistics

   - Receiver


   - Status


   - Location


   - Records in last batch
   - [2015/04/13 16:49:36]


   - Minimum rate
   - [records/sec]


   - Median rate
   - [records/sec]


   - Maximum rate
   - [records/sec]


   - Last Error

Receiver-0---10-10-10-9-Receiver-1---

On Thu, Apr 9, 2015 at 7:55 PM, Tathagata Das t...@databricks.com wrote:

 Are you running # of receivers = # machines?

 

Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread ๏̯͡๏
Code:

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents).map {
  case (itemId, (listing, viDetail)) =
val viSummary = new VISummary
viSummary.leafCategoryId = listing.getLeafCategId().toInt
viSummary.itemSiteId = listing.getItemSiteId().toInt
viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
viSummary.sellerCountryId = listing.getSlrCntryId().toInt
viSummary.buyerSegment = 0
viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
 0) 1 else 0)
val sellerId = listing.getSlrId.toLong
(sellerId, (viDetail, viSummary, itemId))
}

Running Tasks:
Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h
13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0 SUCCESS
PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15
min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5 MB   1 217
0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13
06:43:53 19 min  1.3 min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4
MB   4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com
2015/04/13
06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
MB



Command:
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 --num-executors 3000 --driver-memory 12g --driver-java-options
-XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G


What do i do ? I killed the job twice and its stuck again. Where is it
stuck ?

-- 
Deepak


Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Jonathan Coveney
I'm surprised that I haven't been able to find this via google, but I
haven't...

What is the setting that requests some amount of disk space for the
executors? Maybe I'm misunderstanding how this is configured...

Thanks for any help!


What's the cleanest way to make spark aware of my custom scheduler?

2015-04-13 Thread Jonathan Coveney
I need to have my own scheduler to point to a proprietary remote execution
framework.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2152

I'm looking at where it decides on the backend and it doesn't look like
there is a hook. Of course I can extend sparkContext and add my own, but
that seems sort of lame. Wondering if people know of a better way (or maybe
I'm just missing something obvious)