RE: HBase HTable constructor hangs

2015-04-29 Thread Tridib Samanta
I turned on the TRACE and I see lot of following exception:
 
java.lang.IllegalAccessError: com/google/protobuf/ZeroCopyLiteralByteString
 at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:897)
 at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildGetRowOrBeforeRequest(RequestConverter.java:131)
 at 
org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1402)
 at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:701)
 at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:699)
 at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:120)
 at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
 at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
 at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
 
Thanks
Tridib
 
From: d...@ocirs.com
Date: Tue, 28 Apr 2015 22:24:39 -0700
Subject: Re: HBase HTable constructor hangs
To: tridib.sama...@live.com

In that case, something else is failing and the reason HBase looks like it 
hangs is that the hbase timeout or retry count is too high.
Try setting the following conf and hbase will only hang for a few mins max and 
return a helpful error message.
hbaseConf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2)



--
Dean Chen


On Tue, Apr 28, 2015 at 10:18 PM, Tridib Samanta tridib.sama...@live.com 
wrote:



Nope, my hbase is unsecured.
 
From: d...@ocirs.com
Date: Tue, 28 Apr 2015 22:09:51 -0700
Subject: Re: HBase HTable constructor hangs
To: tridib.sama...@live.com

Hi Tridib,
Are you running this on a secure Hadoop/HBase cluster? I ran in to a similar 
issue where the HBase client can successfully connect in local mode and in the 
yarn-client driver but not on remote executors. The problem is that Spark 
doesn't distribute the hbase auth key, see the following Jira ticket and PR.
https://issues.apache.org/jira/browse/SPARK-6918

--
Dean Chen


On Tue, Apr 28, 2015 at 9:34 PM, Tridib Samanta tridib.sama...@live.com wrote:



I am 100% sure how it's picking up the configuration. I copied the 
hbase-site.xml in hdfs/spark cluster (single machine). I also included 
hbase-site.xml in spark-job jar files. spark-job jar file also have yarn-site 
and mapred-site and core-site.xml in it.
 
One interesting thing is, when I run the spark-job jar as standalone and 
execute the HBase client from a main method, it works fine. Same client unable 
to connect/hangs when the jar is distributed in spark.
 
Thanks
Tridib
 
Date: Tue, 28 Apr 2015 21:25:41 -0700
Subject: Re: HBase HTable constructor hangs
From: yuzhih...@gmail.com
To: tridib.sama...@live.com
CC: user@spark.apache.org

How did you distribute hbase-site.xml to the nodes ?
Looks like HConnectionManager couldn't find the hbase:meta server.
Cheers
On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com wrote:



I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0.
 
Here is the jstack trace. Complete stack trace attached.
 
Executor task launch worker-1 #58 daemon prio=5 os_prio=0 
tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152)
 - locked 0xf8cb7258 (a 
org.apache.hadoop.hbase.client.RpcRetryingCaller)
 at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
 at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
 - locked 0xf84ac0b0 (a java.lang.Object)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
 at 
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
 at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150)
 at 

spark with standalone HBase

2015-04-29 Thread Saurabh Gupta
Hi,

I am working with standalone HBase. And I want to execute HBaseTest.scala
(in scala examples) .

I have created a test table with three rows and I just want to get the
count using HBaseTest.scala

I am getting this issue:

15/04/29 11:17:10 INFO BlockManagerMaster: Registered BlockManager
15/04/29 11:17:11 INFO ZooKeeper: Client
environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
15/04/29 11:17:11 INFO ZooKeeper: Client environment:host.name
=ip-10-144-185-113
15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.version=1.7.0_79
15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.vendor=Oracle
Corporation
15/04/29 11:17:11 INFO ZooKeeper: Client
environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
15/04/29 11:17:11 INFO ZooKeeper: Client
environment:java.class.path=/home/ubuntu/sparkfolder/conf/:/home/ubuntu/sparkfolder/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar:/home/ubuntu/sparkfolder/lib_managed/jars/datanucleus-core-3.2.10.jar:/home/ubuntu/sparkfolder/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/home/ubuntu/sparkfolder/lib_managed/jars/datanucleus-rdbms-3.2.9.jar
15/04/29 11:17:11 INFO ZooKeeper: Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp
15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.compiler=NA
15/04/29 11:17:11 INFO ZooKeeper: Client environment:os.name=Linux
15/04/29 11:17:11 INFO ZooKeeper: Client environment:os.arch=amd64
15/04/29 11:17:11 INFO ZooKeeper: Client
environment:os.version=3.13.0-49-generic
15/04/29 11:17:11 INFO ZooKeeper: Client environment:user.name=root
15/04/29 11:17:11 INFO ZooKeeper: Client environment:user.home=/root
15/04/29 11:17:11 INFO ZooKeeper: Client
environment:user.dir=/home/ubuntu/sparkfolder
15/04/29 11:17:11 INFO ZooKeeper: Initiating client connection,
connectString=localhost:2181 sessionTimeout=9
watcher=hconnection-0x2711025f, quorum=localhost:2181, baseZNode=/hbase
15/04/29 11:17:11 INFO RecoverableZooKeeper: Process
identifier=hconnection-0x2711025f connecting to ZooKeeper
ensemble=localhost:2181
15/04/29 11:17:11 INFO ClientCnxn: Opening socket connection to server
ip-10-144-185-113/10.144.185.113:2181. Will not attempt to authenticate
using SASL (unknown error)
15/04/29 11:17:11 INFO ClientCnxn: Socket connection established to
ip-10-144-185-113/10.144.185.113:2181, initiating session
15/04/29 11:17:11 INFO ClientCnxn: Session establishment complete on server
ip-10-144-185-113/10.144.185.113:2181, sessionid = 0x14d04d506da0005,
negotiated timeout = 4
15/04/29 11:17:11 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is
null

Its just stuck Not showing any error. There is no Hadoop on my machine.
What could be the issue?

here is hbase-site.xml:

configuration
property
   namehbase.zookeeper.quorum/name
  valuelocalhost/value
/property

   property
  namehbase.zookeeper.property.clientPort/name
  value2181/value
   /property
property
namezookeeper.znode.parent/name
   value/hbase/value
/property
/configuration


Re: java.io.IOException: No space left on device

2015-04-29 Thread Dean Wampler
Or multiple volumes. The LOCAL_DIRS (YARN) and SPARK_LOCAL_DIRS (Mesos,
Standalone) environment variables and the spark.local.dir property control
where temporary data is written. The default is /tmp.

See
http://spark.apache.org/docs/latest/configuration.html#runtime-environment
for more details.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Apr 29, 2015 at 6:19 AM, Anshul Singhle ans...@betaglide.com
wrote:

 Do you have multiple disks? Maybe your work directory is not in the right
 disk?

 On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: java.io.IOException: No space left on device

2015-04-29 Thread Dean Wampler
Makes sense. / is where /tmp would be. However, 230G should be plenty of
space. If you have INFO logging turned on (set in
$SPARK_HOME/conf/log4j.properties), you'll see messages about saving data
to disk that will list sizes. The web console also has some summary
information about this.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Apr 29, 2015 at 6:25 AM, selim namsi selim.na...@gmail.com wrote:

 This is the output of df -h so as you can see I'm using only one disk
 mounted on /

 df -h
 Filesystem  Size  Used Avail Use% Mounted on
 /dev/sda8   276G   34G  229G  13% /none4.0K 0  4.0K   0% 
 /sys/fs/cgroup
 udev7.8G  4.0K  7.8G   1% /dev
 tmpfs   1.6G  1.4M  1.6G   1% /runnone5.0M 0  5.0M   
 0% /run/locknone7.8G   37M  7.8G   1% /run/shmnone
 100M   40K  100M   1% /run/user
 /dev/sda1   496M   55M  442M  11% /boot/efi

 Also when running the program, I noticed that the Used% disk space related
 to the partition mounted on / was growing very fast

 On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com
 wrote:

 Do you have multiple disks? Maybe your work directory is not in the right
 disk?

 On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




How Spark SQL supports primary and secondary indexes

2015-04-29 Thread Nikolay Tikhonov
Hi all,

I execute simple SQL query and got a unacceptable performance.

I do the following steps:

1. Apply a schema to an RDD and register table.


2. Run sql query which returns several entries:


Running time for this query 0.2s (table contains 10 entries). I think
that Spark SQL has full in-memory scan and index might increase performance.
How can I add indexes?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-SQL-supports-primary-and-secondary-indexes-tp22700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.io.IOException: No space left on device

2015-04-29 Thread Selim Namsi
Hi,

I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
output,the training data is a file containing 156060 (size 8.1M).

The problem is that when trying to presist a partition into memory and there
is not enought memory, the partition is persisted on disk and despite Having
229G of free disk space, I got  No space left on device..

This is how I'm running the program : 

./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
testData.tsv

And this is a part of the log:



If you need more informations, please let me know.
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to group multiple row data ?

2015-04-29 Thread bipin
Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event,
CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and
CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet out
of them such that I can infer that Customer registered event from 1to 2 and
if present to 3 timewise and preserving the number of entries. For e.g. 

Before processing:
10001, 132, 2002, 1, 2012-11-23
10001, 132, 2002, 1, 2012-11-24
10031, 102, 223, 2, 2012-11-24
10001, 132, 2002, 2, 2012-11-25
10001, 132, 2002, 3, 2012-11-26
(total 5 rows)

After processing:
10001, 132, 2002, 2012-11-23, 1 
10031, 102, 223, 2012-11-24, 2
10001, 132, 2002, 2012-11-24, 1,2,3
(total 5 in last field - comma separated!)

The group must only take the closest previous trigger. The first one hence
shows alone. Can this be done using spark sql ? If it needs to processed in
functionally in scala, how to do this. I can't wrap my head around this. Can
anyone help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dataframe filter based on another Dataframe

2015-04-29 Thread Olivier Girardot
Hi everyone,
what is the most efficient way to filter a DataFrame on a column from
another Dataframe's column. The best idea I had, was to join the two
dataframes :

 val df1 : Dataframe
 val df2: Dataframe
 df1.join(df2, df1(id) === df2(id), inner)

But I end up (obviously) with the id column twice.
Another approach would be to filter df1 but I can't seem to get this to
work using df2's column as a base

Any idea ?

Regards,

Olivier.


Re: Driver memory leak?

2015-04-29 Thread Sean Owen
Please use user@, not dev@

This message does not appear to be from your driver. It also doesn't say
you ran out of memory. It says you didn't tell YARN to let it use the
memory you want. Look at the memory overhead param and please search first
for related discussions.
On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from kafka,
 the program already run about 120 hours, but today the program failed
 because of driver's OOM as follow:

 Container [pid=49133,containerID=container_1429773909253_0050_02_01]
 is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
 physical memory used; 3.2 GB of 50 GB virtual memory used. Killing
 container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for job
 scheduler and job monitor(Please correct me If I'm wrong), Why it using so
 much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as
 follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about 600Mb
 memory!

 and I also using jmap to monitor other program(already run about 1 hours
 ),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the java.util.HashMap$Entry
 and java.lang.Long object will occupied more and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards












Re: A problem of using spark streaming to capture network packets

2015-04-29 Thread Dean Wampler
I would use the ps command on each machine while the job is running to
confirm that every process involved is running as root.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 28, 2015 at 8:58 PM, Lin Hao Xu xulin...@cn.ibm.com wrote:

 btw, from spark web ui, the acl is marked with *root*

 Best regards,

 Lin Hao XU
 IBM Research China
 Email: xulin...@cn.ibm.com
 My Flickr: http://www.flickr.com/photos/xulinhao/sets

 [image: Inactive hide details for Dean Wampler ---2015/04/29
 09:40:34---Are the tasks on the slaves also running as root? If not, that]Dean
 Wampler ---2015/04/29 09:40:34---Are the tasks on the slaves also running
 as root? If not, that might explain the problem.

 From: Dean Wampler deanwamp...@gmail.com
 To: Lin Hao Xu/China/IBM@IBMCN
 Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org
 Date: 2015/04/29 09:40
 Subject: Re: A problem of using spark streaming to capture network packets
 --



 Are the tasks on the slaves also running as root? If not, that might
 explain the problem.

 dean

 Dean Wampler, Ph.D.
 Author: *Programming Scala, 2nd Edition*
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 *Typesafe* http://typesafe.com/
 *@deanwampler* http://twitter.com/deanwampler
 *http://polyglotprogramming.com* http://polyglotprogramming.com/

 On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu *xulin...@cn.ibm.com*
 xulin...@cn.ibm.com wrote:

1. The full command line is written in a shell script:

LIB=/home/spark/.m2/repository

/opt/spark/bin/spark-submit \
--class spark.pcap.run.TestPcapSpark \
--jars

 $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/

 org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar
\
/home/spark/napa/napa.jar

2. And we run this script with *sudo*, if you do not use sudo, then
you cannot access network interface.

3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in
a standard Java program, it really worked like a champion.

Best regards,

Lin Hao XU
IBM Research China
Email: *xulin...@cn.ibm.com* xulin...@cn.ibm.com
My Flickr: *http://www.flickr.com/photos/xulinhao/sets*
http://www.flickr.com/photos/xulinhao/sets

[image: Inactive hide details for Dean Wampler ---2015/04/28
20:07:54---It's probably not your code. What's the full command line you 
 u]Dean
Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the
full command line you use to submit the job?

From: Dean Wampler *deanwamp...@gmail.com* deanwamp...@gmail.com
To: Hai Shan Wu/China/IBM@IBMCN
Cc: user *user@spark.apache.org* user@spark.apache.org, Lin Hao
Xu/China/IBM@IBMCN
Date: 2015/04/28 20:07
Subject: Re: A problem of using spark streaming to capture network
packets
--




It's probably not your code.

What's the full command line you use to submit the job?

Are you sure the job on the cluster has access to the network
interface? Can you test the receiver by itself without Spark? For example,
does this line work as expected:

ListPcapNetworkInterface nifs = Pcaps.findAllDevs();

dean

Dean Wampler, Ph.D.
Author: *Programming Scala, 2nd Edition*
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 *Typesafe* http://typesafe.com/
 *@deanwampler* http://twitter.com/deanwampler
 *http://polyglotprogramming.com* http://polyglotprogramming.com/

On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com*
wuh...@cn.ibm.com wrote:
   Hi Everyone

   We use pcap4j to capture network packets and then use spark
   streaming to analyze captured packets. However, we met a strange 
 problem.

   If we run our application on spark locally (for example,
   spark-submit --master local[2]), then the program runs successfully.

   If we run our application on spark standalone cluster, then the
   program will tell us that NO NIFs found.

   I also attach two test files for clarification.

   So anyone can help on this? Thanks in advance!


 * (See attached file: PcapReceiver.java)(See attached file:
   TestPcapSpark.java)*

   Best regards,

   - Haishan

   Haishan Wu (吴海珊)

   IBM Research - China
   Tel: 86-10-58748508
   Fax: 86-10-58748330
   Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com
   Lotus Notes: Hai Shan Wu/China/IBM



   -
   To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org*
  

Re: java.io.IOException: No space left on device

2015-04-29 Thread Anshul Singhle
Do you have multiple disks? Maybe your work directory is not in the right
disk?

On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: java.io.IOException: No space left on device

2015-04-29 Thread selim namsi
This is the output of df -h so as you can see I'm using only one disk
mounted on /

df -h
Filesystem  Size  Used Avail Use% Mounted on
/dev/sda8   276G   34G  229G  13% /none4.0K 0
4.0K   0% /sys/fs/cgroup
udev7.8G  4.0K  7.8G   1% /dev
tmpfs   1.6G  1.4M  1.6G   1% /runnone5.0M 0
5.0M   0% /run/locknone7.8G   37M  7.8G   1% /run/shmnone
  100M   40K  100M   1% /run/user
/dev/sda1   496M   55M  442M  11% /boot/efi

Also when running the program, I noticed that the Used% disk space related
to the partition mounted on / was growing very fast

On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com
wrote:

 Do you have multiple disks? Maybe your work directory is not in the right
 disk?

 On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




MLib KMeans on large dataset issues

2015-04-29 Thread Sam Stoelinga
Hi Sparkers,

I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a
large K but I've encountered the following issues:


   - Spark driver gets out of memory and dies because collect gets called
   as part of KMeans, which loads all data back to the driver's memory.
   - At the end there is a LocalKMeans class which runs KMeansPlusPlus on
   the Spark driver. Why isn't this distributed? It's spending a long time on
   here and this has the same problem as point 1 requires loading the data to
   the driver.
   Also when LocakKMeans is running on driver also seeing lots of :
   15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus
   initialization ran out of distinct points for centers. Using duplicate
   point for center k = 222
   - Has the above behaviour been like this in previous releases? I
   remember running KMeans before without too much problems.

Looking forward to hear you point out my stupidity or provide work-arounds
that could make Spark KMeans work well on large datasets.

Regards,
Sam Stoelinga


Re: MLib KMeans on large dataset issues

2015-04-29 Thread Jeetendra Gangele
How you are passing feature vector to K means?
its in 2-D space of 1-D array?

Did you try using Streaming Kmeans?

will you be able to paste code here?

On 29 April 2015 at 17:23, Sam Stoelinga sammiest...@gmail.com wrote:

 Hi Sparkers,

 I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a
 large K but I've encountered the following issues:


- Spark driver gets out of memory and dies because collect gets called
as part of KMeans, which loads all data back to the driver's memory.
- At the end there is a LocalKMeans class which runs KMeansPlusPlus on
the Spark driver. Why isn't this distributed? It's spending a long time on
here and this has the same problem as point 1 requires loading the data to
the driver.
Also when LocakKMeans is running on driver also seeing lots of :
15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus
initialization ran out of distinct points for centers. Using duplicate
point for center k = 222
- Has the above behaviour been like this in previous releases? I
remember running KMeans before without too much problems.

 Looking forward to hear you point out my stupidity or provide work-arounds
 that could make Spark KMeans work well on large datasets.

 Regards,
 Sam Stoelinga



Re: MLib KMeans on large dataset issues

2015-04-29 Thread Sam Stoelinga
I'm mostly using example code, see here:
http://paste.openstack.org/show/211966/
The data has 799305 dimensions and is separated by space

Please note the issues I'm seeing is because of the scala implementation
imo as it happens also when using the Python wrappers.



On Wed, Apr 29, 2015 at 8:00 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 How you are passing feature vector to K means?
 its in 2-D space of 1-D array?

 Did you try using Streaming Kmeans?

 will you be able to paste code here?

 On 29 April 2015 at 17:23, Sam Stoelinga sammiest...@gmail.com wrote:

 Hi Sparkers,

 I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a
 large K but I've encountered the following issues:


- Spark driver gets out of memory and dies because collect gets
called as part of KMeans, which loads all data back to the driver's 
 memory.
- At the end there is a LocalKMeans class which runs KMeansPlusPlus
on the Spark driver. Why isn't this distributed? It's spending a long time
on here and this has the same problem as point 1 requires loading the data
to the driver.
Also when LocakKMeans is running on driver also seeing lots of :
15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus
initialization ran out of distinct points for centers. Using duplicate
point for center k = 222
- Has the above behaviour been like this in previous releases? I
remember running KMeans before without too much problems.

 Looking forward to hear you point out my stupidity or provide
 work-arounds that could make Spark KMeans work well on large datasets.

 Regards,
 Sam Stoelinga







Re: Driver memory leak?

2015-04-29 Thread Serega Sheypak
The memory leak could be related to this
https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved
in Spark 1.2.2 and 1.3.0.
@Sean
Will it be backported to CDH? I did't find that bug in CDH 5.4 release
notes.

2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.

 It also was a HashMap causing the issue.

 -Conor



 On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't say
 you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from kafka,
 the program already run about 120 hours, but today the program failed
 because of driver's OOM as follow:

 Container [pid=49133,containerID=container_1429773909253_0050_02_01]
 is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
 physical memory used; 3.2 GB of 50 GB virtual memory used. Killing
 container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as
 follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about 600Mb
 memory!

 and I also using jmap to monitor other program(already run about 1 hours
 ),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the java.util.HashMap$Entry
 and java.lang.Long object will occupied more and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards













Re: How to group multiple row data ?

2015-04-29 Thread Manoj Awasthi
Sorry but I didn't fully understand the grouping. This line:

 The group must only take the closest previous trigger. The first one
hence shows alone.

Can you please explain further?


On Wed, Apr 29, 2015 at 4:42 PM, bipin bipin@gmail.com wrote:

 Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event,
 CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and
 CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet
 out
 of them such that I can infer that Customer registered event from 1to 2 and
 if present to 3 timewise and preserving the number of entries. For e.g.

 Before processing:
 10001, 132, 2002, 1, 2012-11-23
 10001, 132, 2002, 1, 2012-11-24
 10031, 102, 223, 2, 2012-11-24
 10001, 132, 2002, 2, 2012-11-25
 10001, 132, 2002, 3, 2012-11-26
 (total 5 rows)

 After processing:
 10001, 132, 2002, 2012-11-23, 1
 10031, 102, 223, 2012-11-24, 2
 10001, 132, 2002, 2012-11-24, 1,2,3
 (total 5 in last field - comma separated!)

 The group must only take the closest previous trigger. The first one hence
 shows alone. Can this be done using spark sql ? If it needs to processed in
 functionally in scala, how to do this. I can't wrap my head around this.
 Can
 anyone help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: java.io.IOException: No space left on device

2015-04-29 Thread selim namsi
Sorry I put the log messages when creating the thread in
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-td22702.html
but I forgot that raw messages will not be sent in emails.

So this is the log related to the error :

15/04/29 02:48:50 INFO CacheManager: Partition rdd_19_0 not found, computing it
15/04/29 02:48:50 INFO BlockManager: Found block rdd_15_0 locally
15/04/29 02:48:50 INFO CacheManager: Partition rdd_19_1 not found, computing it
15/04/29 02:48:50 INFO BlockManager: Found block rdd_15_1 locally
15/04/29 02:49:13 WARN MemoryStore: Not enough space to cache rdd_19_1
in memory! (computed 1106.0 MB so far)
15/04/29 02:49:13 INFO MemoryStore: Memory use = 234.0 MB (blocks) +
2.6 GB (scratch space shared across 2 thread(s)) = 2.9 GB. Storage
limit = 3.1 GB.
15/04/29 02:49:13 WARN CacheManager: Persisting partition rdd_19_1 to
disk instead.
15/04/29 02:49:28 WARN MemoryStore: Not enough space to cache rdd_19_0
in memory! (computed 1745.7 MB so far)
15/04/29 02:49:28 INFO MemoryStore: Memory use = 234.0 MB (blocks) +
2.6 GB (scratch space shared across 2 thread(s)) = 2.9 GB. Storage
limit = 3.1 GB.
15/04/29 02:49:28 WARN CacheManager: Persisting partition rdd_19_0 to
disk instead.
15/04/29 03:56:12 WARN BlockManager: Putting block rdd_19_0 failed
15/04/29 03:56:12 WARN BlockManager: Putting block rdd_19_1 failed
15/04/29 03:56:12 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 7)
java.io.IOException: No space left on *device

*It seems that the partitions rdd_19_0 and rdd_9=19_1 needs both of
them  2.9 GB.

Thanks


On Wed, Apr 29, 2015 at 12:34 PM Dean Wampler deanwamp...@gmail.com wrote:

 Makes sense. / is where /tmp would be. However, 230G should be plenty of
 space. If you have INFO logging turned on (set in
 $SPARK_HOME/conf/log4j.properties), you'll see messages about saving data
 to disk that will list sizes. The web console also has some summary
 information about this.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Wed, Apr 29, 2015 at 6:25 AM, selim namsi selim.na...@gmail.com
 wrote:

 This is the output of df -h so as you can see I'm using only one disk
 mounted on /

 df -h
 Filesystem  Size  Used Avail Use% Mounted on
 /dev/sda8   276G   34G  229G  13% /none4.0K 0  4.0K   0% 
 /sys/fs/cgroup
 udev7.8G  4.0K  7.8G   1% /dev
 tmpfs   1.6G  1.4M  1.6G   1% /runnone5.0M 0  5.0M   
 0% /run/locknone7.8G   37M  7.8G   1% /run/shmnone
 100M   40K  100M   1% /run/user
 /dev/sda1   496M   55M  442M  11% /boot/efi

 Also when running the program, I noticed that the Used% disk space
 related to the partition mounted on / was growing very fast

 On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com
 wrote:

 Do you have multiple disks? Maybe your work directory is not in the
 right disk?

 On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf
 output,the training data is a file containing 156060 (size 8.1M).

 The problem is that when trying to presist a partition into memory and
 there
 is not enought memory, the partition is persisted on disk and despite
 Having
 229G of free disk space, I got  No space left on device..

 This is how I'm running the program :

 ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline
 --master
 local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv
 testData.tsv

 And this is a part of the log:



 If you need more informations, please let me know.
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Dataframe filter based on another Dataframe

2015-04-29 Thread ayan guha
You can use .select to project only columns you need

On Wed, Apr 29, 2015 at 9:23 PM, Olivier Girardot ssab...@gmail.com wrote:

 Hi everyone,
 what is the most efficient way to filter a DataFrame on a column from
 another Dataframe's column. The best idea I had, was to join the two
 dataframes :

  val df1 : Dataframe
  val df2: Dataframe
  df1.join(df2, df1(id) === df2(id), inner)

 But I end up (obviously) with the id column twice.
 Another approach would be to filter df1 but I can't seem to get this to
 work using df2's column as a base

 Any idea ?

 Regards,

 Olivier.




-- 
Best Regards,
Ayan Guha


Re: MLib KMeans on large dataset issues

2015-04-29 Thread Sam Stoelinga
Guys, great feedback by pointing out my stupidity :D

Rows and columns got intermixed hence the weird results I was seeing.
Ignore my previous issues will reformat my data first.

On Wed, Apr 29, 2015 at 8:47 PM, Sam Stoelinga sammiest...@gmail.com
wrote:

 I'm mostly using example code, see here:
 http://paste.openstack.org/show/211966/
 The data has 799305 dimensions and is separated by space

 Please note the issues I'm seeing is because of the scala implementation
 imo as it happens also when using the Python wrappers.



 On Wed, Apr 29, 2015 at 8:00 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 How you are passing feature vector to K means?
 its in 2-D space of 1-D array?

 Did you try using Streaming Kmeans?

 will you be able to paste code here?

 On 29 April 2015 at 17:23, Sam Stoelinga sammiest...@gmail.com wrote:

 Hi Sparkers,

 I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a
 large K but I've encountered the following issues:


- Spark driver gets out of memory and dies because collect gets
called as part of KMeans, which loads all data back to the driver's 
 memory.
- At the end there is a LocalKMeans class which runs KMeansPlusPlus
on the Spark driver. Why isn't this distributed? It's spending a long 
 time
on here and this has the same problem as point 1 requires loading the 
 data
to the driver.
Also when LocakKMeans is running on driver also seeing lots of :
15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus
initialization ran out of distinct points for centers. Using duplicate
point for center k = 222
- Has the above behaviour been like this in previous releases? I
remember running KMeans before without too much problems.

 Looking forward to hear you point out my stupidity or provide
 work-arounds that could make Spark KMeans work well on large datasets.

 Regards,
 Sam Stoelinga








DataFrame filter referencing error

2015-04-29 Thread Francesco Bigarella
Hi all,

I was testing the DataFrame filter functionality and I found what I think
is a strange behaviour.
My dataframe testDF, obtained loading aMySQL table via jdbc, has the
following schema:
root
 | -- id: long (nullable = false)
 | -- title: string (nullable = true)
 | -- value: string (nullable = false)
 | -- status: string (nullable = false)

What I want to do is filter my dataset to obtain all rows that have a
status = new.

scala testDF.filter(testDF(id) === 1234).first()
works fine (also with the integer value within double quotes), however if I
try to use the same statement to filter on the status column (also with
changes in the syntax - see below), suddenly the program breaks.

Any of the following
scala testDF.filter(testDF(status) === new)
scala testDF.filter(status = 'new')
scala testDF.filter($status === new)

generates the error:

INFO scheduler.DAGScheduler: Job 3 failed: runJob at SparkPlan.scala:121,
took 0.277907 s

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage
3.0 (TID 12, node name):
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column
'new' in 'where clause'

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.Util.getInstance(Util.java:386)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3597)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3529)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1990)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2625)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2119)
at
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2283)
at org.apache.spark.sql.jdbc.JDBCRDD$anon$1.init(JDBCRDD.scala:328)
at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Does filter work only on columns of the integer type? What is the exact
behaviour of the filter function and what is the best way to handle the
query I am trying to execute?

Thank you,
Francesco


Re: Driver memory leak?

2015-04-29 Thread Conor Fennell
The memory leak could be related to this
https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved
in Spark 1.2.2 and 1.3.0.

It also was a HashMap causing the issue.

-Conor



On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't say
 you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from kafka,
 the program already run about 120 hours, but today the program failed
 because of driver's OOM as follow:

 Container [pid=49133,containerID=container_1429773909253_0050_02_01]
 is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
 physical memory used; 3.2 GB of 50 GB virtual memory used. Killing
 container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as
 follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about 600Mb
 memory!

 and I also using jmap to monitor other program(already run about 1 hours
 ),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the java.util.HashMap$Entry
 and java.lang.Long object will occupied more and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards












Re: DataFrame filter referencing error

2015-04-29 Thread ayan guha
Looks like you DF is based on a MySQL DB using jdbc, and error is thrown
from mySQL. Can you see what SQL is finally getting fired in MySQL? Spark
is pushing down the predicate to mysql so its not a spark problem perse

On Wed, Apr 29, 2015 at 9:56 PM, Francesco Bigarella 
francesco.bigare...@gmail.com wrote:

 Hi all,

 I was testing the DataFrame filter functionality and I found what I think
 is a strange behaviour.
 My dataframe testDF, obtained loading aMySQL table via jdbc, has the
 following schema:
 root
  | -- id: long (nullable = false)
  | -- title: string (nullable = true)
  | -- value: string (nullable = false)
  | -- status: string (nullable = false)

 What I want to do is filter my dataset to obtain all rows that have a
 status = new.

 scala testDF.filter(testDF(id) === 1234).first()
 works fine (also with the integer value within double quotes), however if
 I try to use the same statement to filter on the status column (also with
 changes in the syntax - see below), suddenly the program breaks.

 Any of the following
 scala testDF.filter(testDF(status) === new)
 scala testDF.filter(status = 'new')
 scala testDF.filter($status === new)

 generates the error:

 INFO scheduler.DAGScheduler: Job 3 failed: runJob at SparkPlan.scala:121,
 took 0.277907 s

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 3.0 (TID 12, node name):
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column
 'new' in 'where clause'

 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
 at com.mysql.jdbc.Util.getInstance(Util.java:386)
 at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052)
 at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3597)
 at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3529)
 at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1990)
 at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151)
 at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2625)
 at
 com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2119)
 at
 com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2283)
 at org.apache.spark.sql.jdbc.JDBCRDD$anon$1.init(JDBCRDD.scala:328)
 at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Does filter work only on columns of the integer type? What is the exact
 behaviour of the filter function and what is the best way to handle the
 query I am trying to execute?

 Thank you,
 Francesco




-- 
Best Regards,
Ayan Guha


Re: How to stream all data out of a Kafka topic once, then terminate job?

2015-04-29 Thread Dmitry Goldenberg
Yes, and Kafka topics are basically queues. So perhaps what's needed is just 
KafkaRDD with starting offset being 0 and finish offset being a very large 
number...

Sent from my iPhone

 On Apr 29, 2015, at 1:52 AM, ayan guha guha.a...@gmail.com wrote:
 
 I guess what you mean is not streaming.  If you create a stream context at 
 time t, you will receive data coming through starting time t++, not before 
 time t.
 
 Looks like you want a queue. Let Kafka write to a queue, consume msgs from 
 the queue and stop when queue is empty.
 
 On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote:
 Hi,
 
 I'm wondering about the use-case where you're not doing continuous,
 incremental streaming of data out of Kafka but rather want to publish data
 once with your Producer(s) and consume it once, in your Consumer, then
 terminate the consumer Spark job.
 
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(...));
 
 The batchDuration parameter is The time interval at which streaming data
 will be divided into batches. Can this be worked somehow to cause Spark
 Streaming to just get all the available data, then let all the RDD's within
 the Kafka discretized stream get processed, and then just be done and
 terminate, rather than wait another period and try and process any more data
 from Kafka?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-29 Thread bit1...@163.com
Correct myself:
For the SparkContext#wholeTextFile, the RDD's elements are kv pairs, the key is 
the file path, and the value is the file content
So,for the SparkContext#wholeTextFile, the RDD has already carried the file 
information.



bit1...@163.com
 
From: Saisai Shao
Date: 2015-04-29 15:50
To: Akhil Das
CC: bit1...@163.com; Vadim Bichutskiy; lokeshkumar; user
Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name
Yes, looks like a solution but quite tricky. You have to parse the debug string 
to get the file name, also relies on HadoopRDD to get the file name :)

2015-04-29 14:52 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com:
It is possible to access the filename, its a bit tricky though.

 val fstream = ssc.fileStream[LongWritable, IntWritable,
  SequenceFileInputFormat[LongWritable, IntWritable]](/home/akhld/input/)

fstream.foreach(x ={
  //You can get it with this object.
  println(x.values.toDebugString)

} )



Thanks
Best Regards

On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote:
For the SparkContext#textFile, if a directory is given as the path parameter  
,then it will pick up the files in the directory, so the same thing will occur.



bit1...@163.com
 
From: Saisai Shao
Date: 2015-04-29 10:54
To: Vadim Bichutskiy
CC: bit1...@163.com; lokeshkumar; user
Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name
I think it might be useful in Spark Streaming's file input stream, but not sure 
is it useful in SparkContext#textFile, since we specify the file by our own, so 
why we still need to know the file name.

I will open up a JIRA to mention about this feature.

Thanks
Jerry


2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:
I was wondering about the same thing.

Vadim
ᐧ

On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote:
Looks to me  that the same thing also applies to the SparkContext.textFile or 
SparkContext.wholeTextFile, there is no way in RDD to figure out the file 
information where the data in RDD is from 



bit1...@163.com
 
From: Saisai Shao
Date: 2015-04-29 10:10
To: lokeshkumar
CC: spark users
Subject: Re: Spark streaming - textFileStream/fileStream - Get file name
I think currently there's no API in Spark Streaming you can use to get the file 
names for file input streams. Actually it is not trivial to support this, may 
be you could file a JIRA with wishes you want the community to support, so 
anyone who is interested can take a crack on this.

Thanks
Jerry


2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:
Hi Forum,

Using spark streaming and listening to the files in HDFS using
textFileStream/fileStream methods, how do we get the fileNames which are
read by these methods?

I used textFileStream which has file contents in JavaDStream and I got no
success with fileStream as it is throwing me a compilation error with spark
version 1.3.1.

Can someone please tell me if we have an API function or any other way to
get the file names that these streaming methods read?

Thanks
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org






邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
共有 1 个附件
image.png(80K) 极速下载 在线预览 


Re: ReduceByKey and sorting within partitions

2015-04-29 Thread Marco


On 04/27/2015 06:00 PM, Ganelin, Ilya wrote:
 Marco - why do you want data sorted both within and across partitions? If you 
 need to take an ordered sequence across all your data you need to either 
 aggregate your RDD on the driver and sort it, or use zipWithIndex to apply an 
 ordered index to your data that matches the order it was stored on HDFS. You 
 can then get the data in order by filtering based on that index. Let me know 
 if that's not what you need - thanks!
 

Basically, after a mapping d - (k,v), I've to aggregate my data grouped
by key and I also want that the output of this aggregation is sorted. A
way to do that can be something like
flatpMapToPair(myMapFunc).reduceByKey(RangePartitioner,myReduceFunc).mapPartition(i
- sort(i)).

But I was thinking that the sorting phase can be pushed down to the
shuffle phase, as the same thing is done in sortByKey and
repartitionAndSortWithinPartition, calling setKeyOrdering on the
shuffleRDD returned by reduceByKey (or combineByKey).


Am I wrong?

I'm not a Scala programmer, is there an easy way to do that with actual
java apis? If not, what is the quickest way to do that in Scala?

Also a more trival question. I can't find how to use RangePartitioner
from Java because I can't understand what to provide for  Ordering and
ClassTag constructor parameters from Java, where I can find some
reference/examples?

Thank you all,
Marco

 
  
 Sent with Good (www.good.com)
 
 
 -Original Message-
 From: Marco [marcope...@gmail.commailto:marcope...@gmail.com]
 Sent: Monday, April 27, 2015 07:01 AM Eastern Standard Time
 To: user@spark.apache.org
 Subject: ReduceByKey and sorting within partitionsa
 
 
 Hi,
 
 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.
 
 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it returns.
 
 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?
 
 Thanks,
 Marco
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates. The information transmitted herewith is 
 intended only for use by the individual or entity to which it is addressed.  
 If the reader of this message is not the intended recipient, you are hereby 
 notified that any review, retransmission, dissemination, distribution, 
 copying or other use of, or taking of any action in reliance upon this 
 information is strictly prohibited. If you have received this communication 
 in error, please contact the sender and delete the material from your 
 computer.
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set DEBUG level log of spark executor on Standalone deploy mode

2015-04-29 Thread eric wong
Hi,
I want to check the DEBUG log of spark executor on Standalone deploy mode.
But,
1. Set log4j.properties in spark/conf folder on master node and restart
cluster.
no means above works.
2. usning spark-submit --properties-file log4j.
Just print debug log to screen but executor log still seems to be INFO
level

So how could i set the  log level of spark executor on Standalone to DEBUG?

Env Info---
spark 1.1.0

Standalone deploy mode.

Submit shell:
 bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans
--master spark://master:7077 --executor-memory 600m --properties-file
log4j.properties lib/spark-examples-1.1.0-hadoop2.3.0.jar
hdfs://master:8000/kmeans/data-Kmeans-5.3g 8 1


Thanks!
Wang Haihua


Equal Height and Depth Binning in Spark

2015-04-29 Thread kundan kumar
Hi,

I am trying to implement equal depth and equal height binning methods in
spark.

Any insights, existing code for this would be really helpful.

Thanks,
Kundan


Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-29 Thread Saisai Shao
Yes, looks like a solution but quite tricky. You have to parse the debug
string to get the file name, also relies on HadoopRDD to get the file name
:)

2015-04-29 14:52 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com:

 It is possible to access the filename, its a bit tricky though.

  val fstream = ssc.fileStream[LongWritable, IntWritable,
   SequenceFileInputFormat[LongWritable,
 IntWritable]](/home/akhld/input/)

 fstream.foreach(x ={
   //You can get it with this object.
   println(x.values.toDebugString)

 } )

 [image: Inline image 1]

 Thanks
 Best Regards

 On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote:

 For the SparkContext#textFile, if a directory is given as the path
 parameter  ,then it will pick up the files in the directory, so the same
 thing will occur.

 --
 bit1...@163.com


 *From:* Saisai Shao sai.sai.s...@gmail.com
 *Date:* 2015-04-29 10:54
 *To:* Vadim Bichutskiy vadim.bichuts...@gmail.com
 *CC:* bit1...@163.com; lokeshkumar lok...@dataken.net; user
 user@spark.apache.org
 *Subject:* Re: Re: Spark streaming - textFileStream/fileStream - Get
 file name
 I think it might be useful in Spark Streaming's file input stream, but
 not sure is it useful in SparkContext#textFile, since we specify the file
 by our own, so why we still need to know the file name.

 I will open up a JIRA to mention about this feature.

 Thanks
 Jerry


 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:

 I was wondering about the same thing.

 Vadim
 ᐧ

 On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com
 wrote:

 Looks to me  that the same thing also applies to the
 SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD
 to figure out the file information where the data in RDD is from

 --
 bit1...@163.com


 *From:* Saisai Shao sai.sai.s...@gmail.com
 *Date:* 2015-04-29 10:10
 *To:* lokeshkumar lok...@dataken.net
 *CC:* spark users user@spark.apache.org
 *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file
 name
 I think currently there's no API in Spark Streaming you can use to get
 the file names for file input streams. Actually it is not trivial to
 support this, may be you could file a JIRA with wishes you want the
 community to support, so anyone who is interested can take a crack on this.

 Thanks
 Jerry


 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:

 Hi Forum,

 Using spark streaming and listening to the files in HDFS using
 textFileStream/fileStream methods, how do we get the fileNames which
 are
 read by these methods?

 I used textFileStream which has file contents in JavaDStream and I got
 no
 success with fileStream as it is throwing me a compilation error with
 spark
 version 1.3.1.

 Can someone please tell me if we have an API function or any other way
 to
 get the file names that these streaming methods read?

 Thanks
 Lokesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Re: Spark streaming - textFileStream/fileStream - Get file name

2015-04-29 Thread Akhil Das
It is possible to access the filename, its a bit tricky though.

 val fstream = ssc.fileStream[LongWritable, IntWritable,
  SequenceFileInputFormat[LongWritable,
IntWritable]](/home/akhld/input/)

fstream.foreach(x ={
  //You can get it with this object.
  println(x.values.toDebugString)

} )

[image: Inline image 1]

Thanks
Best Regards

On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote:

 For the SparkContext#textFile, if a directory is given as the path
 parameter  ,then it will pick up the files in the directory, so the same
 thing will occur.

 --
 bit1...@163.com


 *From:* Saisai Shao sai.sai.s...@gmail.com
 *Date:* 2015-04-29 10:54
 *To:* Vadim Bichutskiy vadim.bichuts...@gmail.com
 *CC:* bit1...@163.com; lokeshkumar lok...@dataken.net; user
 user@spark.apache.org
 *Subject:* Re: Re: Spark streaming - textFileStream/fileStream - Get file
 name
 I think it might be useful in Spark Streaming's file input stream, but not
 sure is it useful in SparkContext#textFile, since we specify the file by
 our own, so why we still need to know the file name.

 I will open up a JIRA to mention about this feature.

 Thanks
 Jerry


 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:

 I was wondering about the same thing.

 Vadim
 ᐧ

 On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com
 wrote:

 Looks to me  that the same thing also applies to the
 SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD
 to figure out the file information where the data in RDD is from

 --
 bit1...@163.com


 *From:* Saisai Shao sai.sai.s...@gmail.com
 *Date:* 2015-04-29 10:10
 *To:* lokeshkumar lok...@dataken.net
 *CC:* spark users user@spark.apache.org
 *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file
 name
 I think currently there's no API in Spark Streaming you can use to get
 the file names for file input streams. Actually it is not trivial to
 support this, may be you could file a JIRA with wishes you want the
 community to support, so anyone who is interested can take a crack on this.

 Thanks
 Jerry


 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net:

 Hi Forum,

 Using spark streaming and listening to the files in HDFS using
 textFileStream/fileStream methods, how do we get the fileNames which are
 read by these methods?

 I used textFileStream which has file contents in JavaDStream and I got
 no
 success with fileStream as it is throwing me a compilation error with
 spark
 version 1.3.1.

 Can someone please tell me if we have an API function or any other way
 to
 get the file names that these streaming methods read?

 Thanks
 Lokesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Question about Memory Used and VCores Used

2015-04-29 Thread Sandy Ryza
Hi,

Good question.  The extra memory comes from
spark.yarn.executor.memoryOverhead, the space used for the application
master, and the way the YARN rounds requests up.  This explains it in a
little more detail:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

-Sandy

On Tue, Apr 28, 2015 at 7:12 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,guys,
 I have the following computation with 3 workers:
 spark-sql --master yarn --executor-memory 3g --executor-cores 2
 --driver-memory 1g -e 'select count(*) from table'

 The resources used are shown as below on the UI:
 I don't understand why the memory used is 15GB and vcores used is 5. I
 think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and
 the Vcores used shoulde be executor-cores*numOfWorkers=6

 Can you please explain the result?Thanks.



 --
 bit1...@163.com



Re: How to stream all data out of a Kafka topic once, then terminate job?

2015-04-29 Thread ayan guha
I guess what you mean is not streaming.  If you create a stream context at
time t, you will receive data coming through starting time t++, not before
time t.

Looks like you want a queue. Let Kafka write to a queue, consume msgs from
the queue and stop when queue is empty.
On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote:

 Hi,

 I'm wondering about the use-case where you're not doing continuous,
 incremental streaming of data out of Kafka but rather want to publish data
 once with your Producer(s) and consume it once, in your Consumer, then
 terminate the consumer Spark job.

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(...));

 The batchDuration parameter is The time interval at which streaming data
 will be divided into batches. Can this be worked somehow to cause Spark
 Streaming to just get all the available data, then let all the RDD's within
 the Kafka discretized stream get processed, and then just be done and
 terminate, rather than wait another period and try and process any more
 data
 from Kafka?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Multiclass classification using Ml logisticRegression

2015-04-29 Thread selim namsi
Thank you for your Answer!
Yes I would like to work on it.

Selim

On Mon, Apr 27, 2015 at 5:23 AM Joseph Bradley jos...@databricks.com
wrote:

 Unfortunately, the Pipelines API doesn't have multiclass logistic
 regression yet, only binary.  It's really a matter of modifying the current
 implementation; I just added a JIRA for it:
 https://issues.apache.org/jira/browse/SPARK-7159

 You'll need to use the old LogisticRegression API to do multiclass for
 now, until that JIRA gets completed.  (If you're interested in doing it,
 let me know via the JIRA!)

 Joseph

 On Fri, Apr 24, 2015 at 3:26 AM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I just started using spark ML pipeline to implement a multiclass
 classifier
 using LogisticRegressionWithLBFGS (which accepts as a parameters number of
 classes), I followed the Pipeline example in ML- guide and I used
 LogisticRegression class which calls LogisticRegressionWithLBFGS class :

 val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)

 the problem is that LogisticRegression doesn't take numClasses as
 parameters

 Any idea how to solve this problem?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiclass-classification-using-Ml-logisticRegression-tp22644.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to setup this false streaming problem

2015-04-29 Thread Ignacio Blasco
Hi Toni.
Given there is more than one measure by (user, hour) what is the measure
you want to keep? The sum?, the mean?, the most recent measure?. For the
sum or the mean you don't need to care about the timing. And If you wan't
to have the most recent then you can include the timestamp in the reduction
function and use it to decide which measure you keep.

On the other hand. Given the same userId, do you expect that the measures
will arrive reasonably ordered? If that's the case, you may reduce using
only userId as the key and use your reduction function to decide when to
drop the last hour and publish the next one. Maybe using a Map[hour,
(timestamp, measure)] as the reduction argument. The expected result of
that reduction can be a Map with the significant measures per hour of that
DStream.
For example:
Given a DStream with
{ UserId =  1, Ts =  8:30, measure = X},
{ UserId =  1, Ts =  8:45, measure = Y},
{ UserId =  1, Ts =  9:10, measure = Z}

Before the reduction

(1, Map(8:00 - (8:30, X)),
(1, Map(8:00 - (8:45,Y)),
(1, Map(9:10 - (9:10,Z))

After the reduction you can produce (assuming you want to keep the most
recent)

(1, Map(8:00 - (8:45,Y), 9:00 - (9,10, Z))

In a further step you can decide to dump to database the 8:00 measure
because you have in that DStream a measure happening at 9:00
These way you can keep an structure that will use almost constant memory
per userId because in the worst case you will have 2 hour in a map.

Regards
Nacho

2015-04-28 19:38 GMT+02:00 Toni Cebrián tcebr...@enerbyte.com:

  Hi,

   Just new to Spark and in need of some help for framing the problem I
 have. A problem well stated is half solved it's the saying :)

   Let's say that I have a DStream[String] basically containing Json of
 some measurements from IoT devices. In order to keep it simple say that
 after unmarshalling I have data like:

case class Measurement(val deviceId:Long, val timestamp:Date, val
 measurement:Double)

 I need to use DStreams because there is some interest in monitoring
 real-time the measurements of devices. So let's say that I have a dashboard
 with hourly granularity, past hours are consolidated but current hour must
 be kept updated on every input.

 The problem is that the Time that matters is the timestamp in the Json not
 the receiving timestamp by Spark so I think that I have to keep a stateful
 DStream like the one described
 http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
 . I have two questions here:

1. Once a given hour is gone, I'd like to flush the consolidated
stream into a DB. I think the strategy is to have a Stream with key-values
where the key is (userID, truncateByHour(timestamp)) and reducing over the
values. But it seems to me that with this approach Spark has lost any sense
of time, how would you flush all the RDDs with timestamps between 00:00:00
and 00:59:59 for instance? Maybe I'm missing some function in the API
2. How do you deal with events that come with timestamps in the past,
is it a matter of ignoring them, finding a trade-off between memory and how
long the stateful DStream is? But then, who is the one poping the mature
time slices from the stateful stream.

 For me Spark Streaming would be the most natural way to face this problem,
 but maybe a simple Spark processing run every minute could keep easily the
 sorting by time of external events.

 I'd like to hear your thoughts.
 Toni



Re: Re: Question about Memory Used and VCores Used

2015-04-29 Thread bit1...@163.com
Thanks Sandy, it is very useful!



bit1...@163.com
 
From: Sandy Ryza
Date: 2015-04-29 15:24
To: bit1...@163.com
CC: user
Subject: Re: Question about Memory Used and VCores Used
Hi,

Good question.  The extra memory comes from spark.yarn.executor.memoryOverhead, 
the space used for the application master, and the way the YARN rounds requests 
up.  This explains it in a little more detail:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

-Sandy

On Tue, Apr 28, 2015 at 7:12 PM, bit1...@163.com bit1...@163.com wrote:
Hi,guys,
I have the following computation with 3 workers:
spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 
1g -e 'select count(*) from table'

The resources used are shown as below on the UI:
I don't understand why the memory used is 15GB and vcores used is 5. I think 
the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores 
used shoulde be executor-cores*numOfWorkers=6

Can you please explain the result?Thanks.





bit1...@163.com

邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
共有 1 个附件
Catch.jpg(16K) 极速下载 在线预览 


Re: sparksql - HiveConf not found during task deserialization

2015-04-29 Thread Manku Timma
The issue is solved. There was a problem in my hive codebase. Once that was
fixed, -Phive-provided spark is working fine against my hive jars.

On 27 April 2015 at 08:00, Manku Timma manku.tim...@gmail.com wrote:

 Made some progress on this. Adding hive jars to the system classpath is
 needed. But looks like it needs to be towards the end of the system
 classes. Manually adding the hive classpath into
 Client.populateHadoopClasspath solved the issue. But a new issue has come
 up. It looks like some hive initialization needs to happen on the executors
 but is getting missed out.

 15/04/25 07:40:25 ERROR executor.Executor: Exception in task 0.1 in stage
 1.0 (TID 23)
 java.lang.RuntimeException:
 org.apache.hadoop.hive.ql.metadata.HiveException: Hive.get() called without
 a hive db setup
   at
 org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:841)
   at
 org.apache.hadoop.hive.ql.plan.PlanUtils.configureInputJobPropertiesForStorageHandler(PlanUtils.java:776)
   at
 org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:253)
   at
 org.apache.spark.sql.hive.HadoopTableReader$$anonfun$11.apply(TableReader.scala:229)
   at
 org.apache.spark.sql.hive.HadoopTableReader$$anonfun$11.apply(TableReader.scala:229)
   at
 org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:172)
   at
 org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:172)
   at scala.Option.map(Option.scala:145)
   at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:172)
   at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:216)
   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212)
   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
   at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive.get()
 called without a hive db setup
   at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:211)
   at
 org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:797)
   ... 37 more


 On 25 April 2015 at 09:31, Manku Timma manku.tim...@gmail.com wrote:

 Setting SPARK_CLASSPATH is triggering other errors. Not working.


 On 25 April 2015 at 09:16, Manku Timma manku.tim...@gmail.com wrote:

 Actually found the culprit. The JavaSerializerInstance.deserialize is
 called with a classloader (of type MutableURLClassLoader) which has access
 to all the hive classes. But internally it triggers a call to loadClass but
 with the default classloader. Below is the stacktrace (line numbers in the
 JavaSerialization.scala will be a bit off due to my debugging statements).

 I will try out the SPARK_CLASSPATH setting. But I was wondering if this
 has something to do with the way spark-project.hive jars are created v/s
 the way open source apache-hive jars are created. Is this documented
 somewhere? The only info I see is Patrick Wendell's comment in
 https://github.com/apache/spark/pull/2241 (grep for published a
 modified version).

 15/04/25 01:41:04 ERROR util.SparkUncaughtExceptionHandler: Uncaught
 exception in thread Thread[Executor task launch worker-3,5,main]
 java.lang.NoClassDefFoundError: 

Re: Parquet error reading data that contains array of structs

2015-04-29 Thread Cheng Lian

Thanks for the detailed information!

Now I can confirm that this is a backwards-compatibility issue. The data 
written by parquet 1.6rc7 follows the standard LIST structure. However, 
Spark SQL still uses old parquet-avro style two-level structures, which 
causes the problem.


Cheng

On 4/27/15 7:07 PM, Jianshi Huang wrote:

FYI,

Parquet schema output:

message pig_schema {
  optional binary cust_id (UTF8);
  optional int32 part_num;
  optional group ip_list (LIST) {
repeated group ip_t {
  optional binary ip (UTF8);
}
  }
  optional group vid_list (LIST) {
repeated group vid_t {
  optional binary vid (UTF8);
}
  }
  optional group fso_list (LIST) {
repeated group fso_t {
  optional binary fso (UTF8);
}
  }
}


And Parquet meta output:

creator: [parquet-mr (build 
ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr version 1.6.0rc7 
(build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr]
extra:   pig.schema = cust_id: chararray,part_num: int,ip_list: 
{ip_t: (ip: chararray)},vid_list: {vid_t: (vid: chararray)},fso_list: 
{fso_t: (fso: chararray)}


file schema: pig_schema

cust_id: OPTIONAL BINARY O:UTF8 R:0 D:1
part_num:OPTIONAL INT32 R:0 D:1
ip_list: OPTIONAL F:1
.ip_t:   REPEATED F:1
..ip:OPTIONAL BINARY O:UTF8 R:1 D:3
vid_list:OPTIONAL F:1
.vid_t:  REPEATED F:1
..vid:   OPTIONAL BINARY O:UTF8 R:1 D:3
fso_list:OPTIONAL F:1
.fso_t:  REPEATED F:1
..fso:   OPTIONAL BINARY O:UTF8 R:1 D:3

row group 1: RC:1201092 TS:537930256 OFFSET:4

cust_id:  BINARY GZIP DO:0 FPO:4 SZ:10629422/27627221/2.60 
VC:1201092 ENC:PLAIN,RLE,BIT_PACKED
part_num: INT32 GZIP DO:0 FPO:10629426 SZ:358/252/0.70 VC:1201092 
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED

ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:10629784 SZ:41331065/180501686/4.37 
VC:10540378 ENC:PLAIN,RLE

vid_list:
.vid_t:
..vid:BINARY GZIP DO:0 FPO:51960849 SZ:58820404/254819721/4.33 
VC:11011894 ENC:PLAIN,RLE

fso_list:
.fso_t:
..fso:BINARY GZIP DO:0 FPO:110781253 SZ:21363255/74981376/3.51 
VC:5612655 ENC:PLAIN,RLE


row group 2: RC:1830769 TS:1045506907 OFFSET:132144508

cust_id:  BINARY GZIP DO:0 FPO:132144508 SZ:17720131/42110882/2.38 
VC:1830769 ENC:PLAIN,RLE,BIT_PACKED
part_num: INT32 GZIP DO:0 FPO:149864639 SZ:486/346/0.71 VC:1830769 
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED

ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:149865125 
SZ:37687630/342050955/9.08 VC:20061916 ENC:PLAIN,PLAIN_DICTIONARY,RLE

vid_list:
.vid_t:
..vid:BINARY GZIP DO:0 FPO:187552755 
SZ:56498124/516700215/9.15 VC:22410351 ENC:PLAIN,PLAIN_DICTIONARY,RLE

fso_list:
.fso_t:
..fso:BINARY GZIP DO:0 FPO:244050879 
SZ:20110276/144644509/7.19 VC:10739272 ENC:PLAIN,PLAIN_DICTIONARY,RLE


row group 3: RC:22445 TS:4304290 OFFSET:264161155

cust_id:  BINARY GZIP DO:0 FPO:264161155 SZ:221527/516312/2.33 
VC:22445 ENC:PLAIN,RLE,BIT_PACKED
part_num: INT32 GZIP DO:0 FPO:264382682 SZ:102/64/0.63 VC:22445 
ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED

ip_list:
.ip_t:
..ip: BINARY GZIP DO:0 FPO:264382784 SZ:483962/1204312/2.49 
VC:123097 ENC:PLAIN_DICTIONARY,RLE

vid_list:
.vid_t:
..vid:BINARY GZIP DO:0 FPO:264866746 SZ:622977/2122080/3.41 
VC:133136 ENC:PLAIN,PLAIN_DICTIONARY,RLE

fso_list:
.fso_t:
..fso:BINARY GZIP DO:0 FPO:265489723 SZ:240588/461522/1.92 
VC:62173 ENC:PLAIN_DICTIONARY,RLE


Jianshi


On Mon, Apr 27, 2015 at 12:40 PM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Had an offline discussion with Jianshi, the dataset was generated
by Pig.

Jianshi - Could you please attach the output of parquet-schema
path-to-parquet-file? I guess this is a Parquet format
backwards-compatibility issue. Parquet hadn't standardized
representation of LIST and MAP until recently, thus many systems
made their own choice and are not easily inter-operatable. In
earlier days, Spark SQL used LIST and MAP formats similar to Avro,
which was unfortunately not chosen as the current standard format.
Details can be found here:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
This document also defines backwards-compatibility rules to handle
legacy Parquet data written by old Parquet implementations in
various systems.

So ideally, now Spark SQL should always write data following the
standard, and implement all backwards-compatibility rules to read
legacy data. JIRA issue for this is
https://issues.apache.org/jira/browse/SPARK-6774

I'm working on a PR https://github.com/apache/spark/pull/5422 for
this. To fix 

Re: Multiclass classification using Ml logisticRegression

2015-04-29 Thread DB Tsai
Wrapping the old LogisticRegressionWithLBFGS could be a quick solution
for 1.4, and it's not too hard so it's potentially to get into 1.4. In
the long term, I will like to implement a new version like
https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef
which handles the scaling and intercepts implicitly in objective
function so no overhead of creating new transformed dataset.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Wed, Apr 29, 2015 at 1:21 AM, selim namsi selim.na...@gmail.com wrote:
 Thank you for your Answer!
 Yes I would like to work on it.

 Selim

 On Mon, Apr 27, 2015 at 5:23 AM Joseph Bradley jos...@databricks.com
 wrote:

 Unfortunately, the Pipelines API doesn't have multiclass logistic
 regression yet, only binary.  It's really a matter of modifying the current
 implementation; I just added a JIRA for it:
 https://issues.apache.org/jira/browse/SPARK-7159

 You'll need to use the old LogisticRegression API to do multiclass for
 now, until that JIRA gets completed.  (If you're interested in doing it, let
 me know via the JIRA!)

 Joseph

 On Fri, Apr 24, 2015 at 3:26 AM, Selim Namsi selim.na...@gmail.com
 wrote:

 Hi,

 I just started using spark ML pipeline to implement a multiclass
 classifier
 using LogisticRegressionWithLBFGS (which accepts as a parameters number
 of
 classes), I followed the Pipeline example in ML- guide and I used
 LogisticRegression class which calls LogisticRegressionWithLBFGS class :

 val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)

 the problem is that LogisticRegression doesn't take numClasses as
 parameters

 Any idea how to solve this problem?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiclass-classification-using-Ml-logisticRegression-tp22644.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HOw can I merge multiple DataFrame and remove duplicated key

2015-04-29 Thread Wang, Ningjun (LNG-NPV)
I have multiple DataFrame objects each stored in a parquet file.  The DataFrame 
just contains 3 columns (id,  value,  timeStamp). I need to union all the 
DataFrame objects together but for duplicated id only keep the record with the 
latest timestamp. How can I  do that?

I can do this for RDDs by sc.union() to union all the RDDs and then do a 
reduceByKey() to remove duplicated id by keeping only the one with latest 
timeStamp field. But how do I do it for DataFrame?


Ningjun



Re: Spark on Cassandra

2015-04-29 Thread Cody Koeninger
Hadoop version doesn't matter if you're just using cassandra.

On Wed, Apr 29, 2015 at 12:08 PM, Matthew Johnson matt.john...@algomi.com
wrote:

 Hi all,



 I am new to Spark, but excited to use it with our Cassandra cluster. I
 have read in a few places that Spark can interact directly with Cassandra
 now, so I decided to download it and have a play – I am happy to run it in
 standalone cluster mode initially. When I go to download it (
 http://spark.apache.org/downloads.html) I see a bunch of pre-built
 versions for Hadoop and MapR, but no mention of Cassandra – if I am running
 it in standalone cluster mode, does it matter which pre-built package I
 download? Would all of them work? Or do I have to build it myself from
 source with some special config for Cassandra?



 Thanks!

 Matt



Re: Driver memory leak?

2015-04-29 Thread Sean Owen
Not sure what you mean. It's already in CDH since 5.4 = 1.3.0
(This isn't the place to ask about CDH)
I also don't think that's the problem. The process did not run out of
memory.

On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com
wrote:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.
 @Sean
 Will it be backported to CDH? I did't find that bug in CDH 5.4 release
 notes.

 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.

 It also was a HashMap causing the issue.

 -Conor



 On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't say
 you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from
 kafka, the program already run about 120 hours, but today the program
 failed because of driver's OOM as follow:

 Container
 [pid=49133,containerID=container_1429773909253_0050_02_01] is running
 beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical
 memory used; 3.2 GB of 50 GB virtual memory used. Killing container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result
 as follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about
 600Mb memory!

 and I also using jmap to monitor other program(already run about 1
 hours),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the java.util.HashMap$Entry
 and java.lang.Long object will occupied more and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards














Spark on Cassandra

2015-04-29 Thread Matthew Johnson
Hi all,



I am new to Spark, but excited to use it with our Cassandra cluster. I have
read in a few places that Spark can interact directly with Cassandra now,
so I decided to download it and have a play – I am happy to run it in
standalone cluster mode initially. When I go to download it (
http://spark.apache.org/downloads.html) I see a bunch of pre-built versions
for Hadoop and MapR, but no mention of Cassandra – if I am running it in
standalone cluster mode, does it matter which pre-built package I download?
Would all of them work? Or do I have to build it myself from source with
some special config for Cassandra?



Thanks!

Matt


Re: How Spark SQL supports primary and secondary indexes

2015-04-29 Thread Nikolay Tikhonov
I'm running this query with different parameter on the same RDD and got 0.2s
for each query. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-SQL-supports-primary-and-secondary-indexes-tp22700p22706.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: indexing an RDD [Python]

2015-04-29 Thread Sven Krasser
Hey Roberto,

You will likely want to use a cogroup() then, but it hinges all on how your
data looks, i.e. if you have the index in the key. Here's an example:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#cogroup
.

Clone: RDDs are immutable, so if you need to make changes to it, those will
result in a new RDD.

Best,
-Sven


On Fri, Apr 24, 2015 at 4:49 PM, Pagliari, Roberto rpagli...@appcomsci.com
wrote:

 Hi,

 I may need to read many values. The list [0,4,5,6,8] is the locations of
 the rows I’d like to extract from the RDD (of labledPoints). Could you
 possibly provide a quick example?



 Also, I’m not quite sure how this work, but the resulting RDD should be a
 clone, as I may need to modify the values and preserve the original ones.



 Thank you,





 *From:* Sven Krasser [mailto:kras...@gmail.com]
 *Sent:* Friday, April 24, 2015 5:56 PM
 *To:* Pagliari, Roberto
 *Cc:* user@spark.apache.org
 *Subject:* Re: indexing an RDD [Python]



 The solution depends largely on your use case. I assume the index is in
 the key. In that case, you can make a second RDD out of the list of indices
 and then use cogroup() on both.

 If the list of indices is small, just using filter() will work well.

 If you need to read back a few select values to the driver, take a look at
 lookup().



 On Fri, Apr 24, 2015 at 1:51 PM, Pagliari, Roberto 
 rpagli...@appcomsci.com wrote:

 I have an RDD of LabledPoints.
 Is it possible to select a subset of it based on a list of indeces?
 For example with idx=[0,4,5,6,8], I'd like to be able to create a new RDD
 with elements 0,4,5,6 and 8.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 www.skrasser.com http://www.skrasser.com/?utm_source=sig




-- 
www.skrasser.com http://www.skrasser.com/?utm_source=sig


Compute pairwise distance

2015-04-29 Thread Driesprong, Fokko
Dear Sparkers,

I am working on an algorithm which requires the pair distance between all
points (eg. DBScan, LOF, etc.). Computing this for *n* points will require
produce a n^2 matrix. If the distance measure is symmetrical, this can be
reduced to (n^2)/2. What would be the most optimal way of computing this?

The paper *Pairwise Element Computation with MapReduce
https://www.cs.ucsb.edu/~ravenben/classes/290F/papers/kvl10.pdf* paper
describes different approaches to optimize this process within a map-reduce
model. Although I don't believe this is applicable to Spark. How would you
guys approach this?

I first thought about broadcasting the original points to all the workers,
and then compute the distances across the different workers. Although this
requires all the points to be distributed across all the machines. But this
feels rather brute-force, what do you guys think.

I don't expect full solutions, but some pointers would be great. I think a
good solution can be re-used for many algorithms.

Kind regards,
Fokko Driesprong


Re: Slower performance when bigger memory?

2015-04-29 Thread Sven Krasser
On Mon, Apr 27, 2015 at 7:36 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Thanks. So may I know what is your configuration for more/smaller
 executors on r3.8xlarge, how big of the memory that you eventually decide
 to give one executor without impact performance (for example: 64g? ).


We're currently using 16 executors with 2 cores each per instance. This is
mainly due to the S3 throughput observations I mentioned (all based on
earlier versions, so I would not be surprised if larger executors will work
equally well now). Among these, we chop the available memory up equally,
but we still leave a lot of extra room for Python workers (as a result of
SPARK-5395; since that's fixed, we have more room to grow the JVM heap now).

Best,
-Sven

-- 
www.skrasser.com http://www.skrasser.com/?utm_source=sig


Sort (order by) of the big dataset

2015-04-29 Thread Ulanov, Alexander
Hi,

I have a 2 billion records dataset witch schema eventId: String, time: Double, 
value: Double. It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 
1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 
workers with 3GB memory.

I keep failing to sort the mentioned dataset in Spark. I do the following:
val pf = sqlContext.parquetFile(hdfs://my.net/data.parquet)
pf.registerTempTable(data)
val sorted = sqlContext.sql(select * from data order by time)
sorted.saveAsParquetFile(hdfs://my.net/data-sorted.parquet)

Spark starts to execute tasks and then errors like Exector Lost pop up in the 
web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), 
giving different types of Exceptions in explanation. My thinking is that the 
main problem is with GC overhead limit exception however I observe exceptions 
related to connection time out and shuffling write 
(java.lang.IllegalStateException: Shutdown in progress; 
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ).

What I tried:
1)Tried to order by eventId and by both order by eventId, time with the 
same result.
2)Looked at the shuffle parameters but the default do make sense.
3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
pf.repartition(3000) in order to get smaller chunks of data passed to executors 
(originally there are 300 partitions). It did not help either. Surprisingly 
this dataset takes 50GB on hdfs versus 23GB that took the original.
4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not 
help.

Could you suggest what might be the problem and what is the workaround? Just in 
case, I cannot have more RAM or more machines :)

Best  regards, Alexander


Performance advantage by loading data from local node over S3.

2015-04-29 Thread Nisrina Luthfiyati
Hi all,
I'm new to Spark so I'm sorry if the question is too vague. I'm currently
trying to deploy a Spark cluster using YARN on an amazon EMR cluster. For
the data storage I'm currently using S3 but would loading the data in HDFS
from local node gives considerable performance advantage over loading from
S3?
Would the reduced traffic latency in data load affect the runtime largely,
considering most of the computation is done in memory?

Thank you,
Nisrina.


RE: Spark on Cassandra

2015-04-29 Thread Huang, Roger
http://planetcassandra.org/getting-started-with-apache-spark-and-cassandra/
http://planetcassandra.org/blog/holy-momentum-batman-spark-and-cassandra-circa-2015-w-datastax-connector-and-java/
https://github.com/datastax/spark-cassandra-connector



From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Wednesday, April 29, 2015 12:15 PM
To: Matthew Johnson
Cc: user@spark.apache.org
Subject: Re: Spark on Cassandra

Hadoop version doesn't matter if you're just using cassandra.

On Wed, Apr 29, 2015 at 12:08 PM, Matthew Johnson 
matt.john...@algomi.commailto:matt.john...@algomi.com wrote:
Hi all,

I am new to Spark, but excited to use it with our Cassandra cluster. I have 
read in a few places that Spark can interact directly with Cassandra now, so I 
decided to download it and have a play – I am happy to run it in standalone 
cluster mode initially. When I go to download it 
(http://spark.apache.org/downloads.html) I see a bunch of pre-built versions 
for Hadoop and MapR, but no mention of Cassandra – if I am running it in 
standalone cluster mode, does it matter which pre-built package I download? 
Would all of them work? Or do I have to build it myself from source with some 
special config for Cassandra?

Thanks!
Matt



Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Tom Hubregtsen
Thanks for the responses.

Try removing toDebugString and see what happens. 

The toDebugString is performed after [d] (the action), as [e]. By then all
stages are already executed.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Richard Marscher
I'm not sure, but I wonder if because you are using the Spark REPL that it
may not be representing what a normal runtime execution would look like and
is possibly eagerly running a partial DAG once you define an operation that
would cause a shuffle.

What happens if you setup your same set of commands [a-e] in a file and use
the Spark REPL's `load` or `paste` command to load them all at once?

On Wed, Apr 29, 2015 at 2:55 PM, Tom Hubregtsen thubregt...@gmail.com
wrote:

 Thanks for the responses.

 Try removing toDebugString and see what happens. 

 The toDebugString is performed after [d] (the action), as [e]. By then all
 stages are already executed.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Extra stage that executes before triggering computation with an action

2015-04-29 Thread Tom Hubregtsen
Hi, I am trying to see exactly what happens underneath the hood of Spar when
performing a simple sortByKey. So far I've already discovered the
fetch-files and both the temp-shuffle and shuffle files being written to
disk, but there is still an extra stage that keeps on puzzling me.

This is the code that I execute 1 by 1 in the spark-shell (I will refer to
them as [letter])
val input = sc.textFile(path, 2) [a]
val inputRDD = input.map(some lamdba to create key_value) [b]
val result = inputRDD.sortByKey(true, 2) [c]
result.saveAsTextFile [d]

As there is only one shuffle, I expect to see two stages. This is confirmed
by result.toDebugString:
(2) ShuffledRDD[5] at sortByKey at console:25 [] - [c]
 +-(2) MapPartitionsRDD[2] at map at console:23 [] - [b]
|  ./Sort/input-10-records-2-parts/ MapPartitionsRDD[1] at textFile at
console:21 [] - [a] 
|  ./Sort/input-10-records-2-parts/ HadoopRDD[0] at textFile at
console:21 [] - [a]
As there is one indentation, there should be 2 stages. There is an extra RDD
(MapPartitionsRDD[6]) that is created by [d], but is not a parent of my
result RDD, so not listed in this trace.

Now when I run these commands 1 by 1 in the spark-shell, I see the following
execution:
[a]
[b]
[c] (//no action performed yet)
INFO SparkContext: Starting job: sortByKey at console:25
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at
sortByKey at console:25), which has no missing parents
INFO DAGScheduler: ResultStage 0 (sortByKey at console:25) finished in
0.109 s
INFO DAGScheduler: Job 0 finished: sortByKey at console:25,
[d] (// Here I trigger the computation with an actual action)
INFO SparkContext: Starting job: saveAsTextFile at console:28
INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[2] at map
at console:23), which has no missing parents
INFO DAGScheduler: ShuffleMapStage 1 (map at console:23) finished
INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[6] at
saveAsTextFile at console:28)
INFO DAGScheduler: ResultStage 2 (saveAsTextFile at console:28) finished
INFO DAGScheduler: Job 1 finished: saveAsTextFile at console:28

Job 1 with stage 1 and 2 seem logical for me, it is computing everything
before and after the shuffle (wide dependency) respectively. Now what I find
interesting and puzzling, is Job 0 with stage 0. It executes and finishes
before I perform an action (in [d]), and with larger input set can also take
a noticeable time. Does anybody have any idea what is running in this
Job/stage 0?  

Thanks,

Tom Hubregtsen




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Hardware provisioning for Spark SQl

2015-04-29 Thread Pietro Gentile
Hi all,

I have to estimate resource requirements for my hadoop/spark cluster. In
particular, i have to query about 100tb of hbase table to do aggregation
with spark sql.

What is, approximately, the most suitable cluster configuration for my use
case? In order to query data in a fast way. At last i have to develope
an online analytical application on these data.

I would like to know what kind of nodes i have to configure to achieve the
goal. How many RAM, cores, disks these nodes should have??

Thanks in advance,
Best regards,

Pietro


Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Thanks for the suggestion. I ran the command and the limit is 1024.

Based on my understanding, the connector to Kafka should not open so many
files. Do you think there is possible socket leakage? BTW, in every batch
which is 5 seconds, I output some results to mysql:

  def ingestToMysql(data: Array[String]) {
val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
var sql = insert into loggingserver1 values 
data.foreach(line = sql += line)
sql = sql.dropRight(1)
sql += ;
logger.info(sql)
var conn: java.sql.Connection = null
try {
  conn = DriverManager.getConnection(url)
  val statement = conn.createStatement()
  statement.executeUpdate(sql)
} catch {
  case e: Exception = logger.error(e.getMessage())
} finally {
  if (conn != null) {
conn.close
  }
}
  }

I am not sure whether the leakage originates from Kafka connector or the
sql connections.

Bill

On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 

multiple programs compilation by sbt.

2015-04-29 Thread Dan Dong
Hi,
  Following the Quick Start guide:
https://spark.apache.org/docs/latest/quick-start.html

I could compile and run a Spark program successfully, now my question is
how to
compile multiple programs with sbt in a bunch. E.g, two programs as:


./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp_A.scala
./src/main/scala/SimpleApp_B.scala

Hopefully with sbt package, I will get two .jar files for each of the
source program, then I can run them separately in Spark. I tried to create
two .sbt files for each program, but found only one .jar file is created.

./simpleA.sbt
name := Simple Project A
version := 1.0
scalaVersion := 2.10.4
libraryDependencies += org.apache.spark %% spark-core % 1.3.1

./simpleB.sbt
name := Simple Project B
version := 1.0
scalaVersion := 2.10.4
libraryDependencies += org.apache.spark %% spark-core % 1.3.1

  Does anybody know how to do it?

Cheers,
Dan


Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Ted Yu
Can you run the command 'ulimit -n' to see the current limit ?

To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
Cheers

On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in the
 following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many
 open files, java.io.IOException: Too many open files, java.io.IOException:
 Too many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Ted Yu
Maybe add statement.close() in finally block ?

Streaming / Kafka experts may have better insight.

On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so many
 files. Do you think there is possible socket leakage? BTW, in every batch
 which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at 

Re: HOw can I merge multiple DataFrame and remove duplicated key

2015-04-29 Thread ayan guha
Its no different, you would use group by and aggregate function to do so.
On 30 Apr 2015 02:15, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com
wrote:

  I have multiple DataFrame objects each stored in a parquet file.  The
 DataFrame just contains 3 columns (id,  value,  timeStamp). I need to union
 all the DataFrame objects together but for duplicated id only keep the
 record with the latest timestamp. How can I  do that?



 I can do this for RDDs by sc.union() to union all the RDDs and then do a
 reduceByKey() to remove duplicated id by keeping only the one with latest
 timeStamp field. But how do I do it for DataFrame?





 Ningjun





Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
Hi all,

I am using the direct approach to receive real-time data from Kafka in the
following link:

https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


My code follows the word count direct example:

https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



After around 12 hours, I got the following error messages in Spark log:

15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
143033869 ms
org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many
open files, java.io.IOException: Too many open files, java.io.IOException:
Too many open files, java.io.IOException: Too many open files,
java.io.IOException: Too many open files)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at

Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Tom Hubregtsen
I'm not sure, but I wonder if because you are using the Spark REPL that it
may not be representing what a normal runtime execution would look like and
is possibly eagerly running a partial DAG once you define an operation that
would cause a shuffle.

What happens if you setup your same set of commands [a-e] in a file and use
the Spark REPL's `load` or `paste` command to load them all at once? From
Richard

I have also packaged it in a jar file (without [e], the debug string), and
still see the extra stage before the other two that I would expect. Even
when I remove [d], the action, I still see stage 0 being executed (and do
not see stage 1 and 2). 

Again a shortened log of the Stage 0:
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at
sortByKey, which has no missing parents
INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Is the function ingestToMysql running on the driver or on the executors?
Accordingly you can try debugging while running in a distributed manner,
with and without calling the function.

If you dont get too many open files without calling ingestToMysql(), the
problem is likely to be in ingestToMysql().
If you get the problem even without calling ingestToMysql(), then the
problem may be in Kafka. If the problem is occuring in the driver, then its
the DirecKafkaInputDStream code. If the problem is occurring in the
executor, then the problem is in KafkaRDD.

TD

On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so many
 files. Do you think there is possible socket leakage? BTW, in every batch
 which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 

Re: Join between Streaming data vs Historical Data in spark

2015-04-29 Thread Tathagata Das
Have you taken a look at the join section in the streaming programming
guide?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#stream-dataset-joins

On Wed, Apr 29, 2015 at 7:11 AM, Rendy Bambang Junior 
rendy.b.jun...@gmail.com wrote:

 Let say I have transaction data and visit data

 visit
 | userId | Visit source | Timestamp |
 | A  | google ads   | 1 |
 | A  | facebook ads | 2 |

 transaction
 | userId | total price | timestamp |
 | A  | 100 | 248384|
 | B  | 200 | 43298739  |

 I want to join transaction data and visit data to do sales attribution. I
 want to do it realtime whenever transaction occurs (streaming).

 Is it scalable to do join between one data and very big historical data
 using join function in spark? If it is not, then how it usually be done?

 Visit needs to be historical, since visit can be anytime before
 transaction (e.g. visit is one year before transaction occurs)

 Rendy



Kryo serialization of classes in additional jars

2015-04-29 Thread Akshat Aranya
Hi,

Is it possible to register kryo serialization for classes contained in jars
that are added with spark.jars?  In my experiment it doesn't seem to
work, likely because the class registration happens before the jar is
shipped to the executor and added to the classloader.  Here's the general
idea of what I want to do:

   val sparkConf = new SparkConf(true)
  .set(spark.jars, foo.jar)
  .setAppName(foo)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

// register classes contained in foo.jar
sparkConf.registerKryoClasses(Array(
  classOf[com.foo.Foo],
  classOf[com.foo.Bar]))


Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Bill Jay
This function is called in foreachRDD. I think it should be running in the
executors. I add the statement.close() in the code and it is running. I
will let you know if this fixes the issue.



On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(), the
 problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 

Re: Compute pairwise distance

2015-04-29 Thread ayan guha
This is my first thought, please suggest any further improvement:
1. Create a rdd of your dataset
2. Do an cross join to generate pairs
3. Apply reducebykey and compute distance. You will get a rdd with keypairs
and distance

Best
Ayan
On 30 Apr 2015 06:11, Driesprong, Fokko fo...@driesprong.frl wrote:

 Dear Sparkers,

 I am working on an algorithm which requires the pair distance between all
 points (eg. DBScan, LOF, etc.). Computing this for *n* points will
 require produce a n^2 matrix. If the distance measure is symmetrical, this
 can be reduced to (n^2)/2. What would be the most optimal way of computing
 this?

 The paper *Pairwise Element Computation with MapReduce
 https://www.cs.ucsb.edu/~ravenben/classes/290F/papers/kvl10.pdf* paper
 describes different approaches to optimize this process within a map-reduce
 model. Although I don't believe this is applicable to Spark. How would you
 guys approach this?

 I first thought about broadcasting the original points to all the workers,
 and then compute the distances across the different workers. Although this
 requires all the points to be distributed across all the machines. But this
 feels rather brute-force, what do you guys think.

 I don't expect full solutions, but some pointers would be great. I think a
 good solution can be re-used for many algorithms.

 Kind regards,
 Fokko Driesprong



Re: multiple programs compilation by sbt.

2015-04-29 Thread Dan Dong
HI, Ted,
  I will have a look at it , thanks a lot.

  Cheers,
  Dan
 2015年4月29日 下午5:00于 Ted Yu yuzhih...@gmail.com写道:

 Have you looked at
 http://www.scala-sbt.org/0.13/tutorial/Multi-Project.html ?

 Cheers

 On Wed, Apr 29, 2015 at 2:45 PM, Dan Dong dongda...@gmail.com wrote:

 Hi,
   Following the Quick Start guide:
 https://spark.apache.org/docs/latest/quick-start.html

 I could compile and run a Spark program successfully, now my question is
 how to
 compile multiple programs with sbt in a bunch. E.g, two programs as:


 ./src
 ./src/main
 ./src/main/scala
 ./src/main/scala/SimpleApp_A.scala
 ./src/main/scala/SimpleApp_B.scala

 Hopefully with sbt package, I will get two .jar files for each of the
 source program, then I can run them separately in Spark. I tried to create
 two .sbt files for each program, but found only one .jar file is created.

 ./simpleA.sbt
 name := Simple Project A
 version := 1.0
 scalaVersion := 2.10.4
 libraryDependencies += org.apache.spark %% spark-core % 1.3.1

 ./simpleB.sbt
 name := Simple Project B
 version := 1.0
 scalaVersion := 2.10.4
 libraryDependencies += org.apache.spark %% spark-core % 1.3.1

   Does anybody know how to do it?

 Cheers,
 Dan





Re: implicit function in SparkStreaming

2015-04-29 Thread Tathagata Das
I believe that the implicit def is pulling in the enclosing class (in which
the def is defined) in the closure which is not serializable.


On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Hi guys,
 I`m puzzled why i cant use the implicit function in spark streaming to
 cause Task not serializable .

 code snippet:

 implicit final def str2KeyValue(s:String): (String,String) = {
   val message = s.split(\\|)
   if(message.length = 2)
 (message(0),message(1))
   else if(message.length == 1) {
 (message(0), )
   }
   else
 (,)
 }

 def filter(stream:DStream[String]) :DStream[String] = {
   stream.filter(s = {
 (s._1==Action  s._2==TRUE)
   })


 Could you please give me some pointers ? Thank you .

 --
 guoqing0...@yahoo.com.hk



Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Also cc;ing Cody.

@Cody maybe there is a reason for doing connection pooling even if there is
not performance difference.

TD

On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(), the
 problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 

Re: Driver memory leak?

2015-04-29 Thread Tathagata Das
It could be related to this.
https://issues.apache.org/jira/browse/SPARK-6737

This was fixed in Spark 1.3.1.



On Wed, Apr 29, 2015 at 8:38 AM, Sean Owen so...@cloudera.com wrote:

 Not sure what you mean. It's already in CDH since 5.4 = 1.3.0
 (This isn't the place to ask about CDH)
 I also don't think that's the problem. The process did not run out of
 memory.

 On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com
 wrote:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.
 @Sean
 Will it be backported to CDH? I did't find that bug in CDH 5.4 release
 notes.

 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.

 It also was a HashMap causing the issue.

 -Conor



 On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't
 say you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from
 kafka, the program already run about 120 hours, but today the program
 failed because of driver's OOM as follow:

 Container
 [pid=49133,containerID=container_1429773909253_0050_02_01] is running
 beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical
 memory used; 3.2 GB of 50 GB virtual memory used. Killing container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it 
 using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result
 as follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about
 600Mb memory!

 and I also using jmap to monitor other program(already run about 1
 hours),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the
 java.util.HashMap$Entry and java.lang.Long object will occupied more
 and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards















Re: multiple programs compilation by sbt.

2015-04-29 Thread Ted Yu
Have you looked at http://www.scala-sbt.org/0.13/tutorial/Multi-Project.html
?

Cheers

On Wed, Apr 29, 2015 at 2:45 PM, Dan Dong dongda...@gmail.com wrote:

 Hi,
   Following the Quick Start guide:
 https://spark.apache.org/docs/latest/quick-start.html

 I could compile and run a Spark program successfully, now my question is
 how to
 compile multiple programs with sbt in a bunch. E.g, two programs as:


 ./src
 ./src/main
 ./src/main/scala
 ./src/main/scala/SimpleApp_A.scala
 ./src/main/scala/SimpleApp_B.scala

 Hopefully with sbt package, I will get two .jar files for each of the
 source program, then I can run them separately in Spark. I tried to create
 two .sbt files for each program, but found only one .jar file is created.

 ./simpleA.sbt
 name := Simple Project A
 version := 1.0
 scalaVersion := 2.10.4
 libraryDependencies += org.apache.spark %% spark-core % 1.3.1

 ./simpleB.sbt
 name := Simple Project B
 version := 1.0
 scalaVersion := 2.10.4
 libraryDependencies += org.apache.spark %% spark-core % 1.3.1

   Does anybody know how to do it?

 Cheers,
 Dan




Re: How to stream all data out of a Kafka topic once, then terminate job?

2015-04-29 Thread Dmitry Goldenberg
Part of the issues is, when you read messages in a topic, the messages are
peeked, not polled, so there'll be no when the queue is empty, as I
understand it.

So it would seem I'd want to do KafkaUtils.createRDD, which takes an array
of OffsetRange's. Each OffsetRange is characterized by topic, partition,
fromOffset, and untilOffset. In my case, I want to read all data, i.e. from
all partitions and I don't know how many partitions there may be, nor do I
know the 'untilOffset' values.

In essence, I just want something like createRDD(new OffsetRangeAllData());

In addition, I'd ideally want the option of not peeking but polling the
messages off the topics involved.  But I'm not sure whether Kafka API's
support it and then whether Spark does/will support that as well...



On Wed, Apr 29, 2015 at 1:52 AM, ayan guha guha.a...@gmail.com wrote:

 I guess what you mean is not streaming.  If you create a stream context at
 time t, you will receive data coming through starting time t++, not before
 time t.

 Looks like you want a queue. Let Kafka write to a queue, consume msgs from
 the queue and stop when queue is empty.
 On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote:

 Hi,

 I'm wondering about the use-case where you're not doing continuous,
 incremental streaming of data out of Kafka but rather want to publish data
 once with your Producer(s) and consume it once, in your Consumer, then
 terminate the consumer Spark job.

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(...));

 The batchDuration parameter is The time interval at which streaming data
 will be divided into batches. Can this be worked somehow to cause Spark
 Streaming to just get all the available data, then let all the RDD's
 within
 the Kafka discretized stream get processed, and then just be done and
 terminate, rather than wait another period and try and process any more
 data
 from Kafka?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: HBase HTable constructor hangs

2015-04-29 Thread Ted Yu
Can you verify whether the hbase release you're using has the following fix
?
HBASE-8 non environment variable solution for IllegalAccessError

Cheers

On Tue, Apr 28, 2015 at 10:47 PM, Tridib Samanta tridib.sama...@live.com
wrote:

 I turned on the TRACE and I see lot of following exception:

 java.lang.IllegalAccessError: com/google/protobuf/ZeroCopyLiteralByteString
  at
 org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:897)
  at
 org.apache.hadoop.hbase.protobuf.RequestConverter.buildGetRowOrBeforeRequest(RequestConverter.java:131)
  at
 org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1402)
  at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:701)
  at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:699)
  at
 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:120)
  at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
  at
 org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011)
  at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326)
  at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192)

 Thanks
 Tridib

 --
 From: d...@ocirs.com
 Date: Tue, 28 Apr 2015 22:24:39 -0700
 Subject: Re: HBase HTable constructor hangs
 To: tridib.sama...@live.com

 In that case, something else is failing and the reason HBase looks like it
 hangs is that the hbase timeout or retry count is too high.

 Try setting the following conf and hbase will only hang for a few mins max
 and return a helpful error message.

 hbaseConf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2)




 --
 Dean Chen

 On Tue, Apr 28, 2015 at 10:18 PM, Tridib Samanta tridib.sama...@live.com
 wrote:

 Nope, my hbase is unsecured.

 --
 From: d...@ocirs.com
 Date: Tue, 28 Apr 2015 22:09:51 -0700
 Subject: Re: HBase HTable constructor hangs
 To: tridib.sama...@live.com


 Hi Tridib,

 Are you running this on a secure Hadoop/HBase cluster? I ran in to a
 similar issue where the HBase client can successfully connect in local mode
 and in the yarn-client driver but not on remote executors. The problem is
 that Spark doesn't distribute the hbase auth key, see the following Jira
 ticket and PR.

 https://issues.apache.org/jira/browse/SPARK-6918


 --
 Dean Chen

 On Tue, Apr 28, 2015 at 9:34 PM, Tridib Samanta tridib.sama...@live.com
 wrote:

 I am 100% sure how it's picking up the configuration. I copied the
 hbase-site.xml in hdfs/spark cluster (single machine). I also included
 hbase-site.xml in spark-job jar files. spark-job jar file also have
 yarn-site and mapred-site and core-site.xml in it.

 One interesting thing is, when I run the spark-job jar as standalone and
 execute the HBase client from a main method, it works fine. Same client
 unable to connect/hangs when the jar is distributed in spark.

 Thanks
 Tridib

 --
 Date: Tue, 28 Apr 2015 21:25:41 -0700

 Subject: Re: HBase HTable constructor hangs
 From: yuzhih...@gmail.com
 To: tridib.sama...@live.com
 CC: user@spark.apache.org

 How did you distribute hbase-site.xml to the nodes ?

 Looks like HConnectionManager couldn't find the hbase:meta server.

 Cheers

 On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com
 wrote:

 I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0.

 Here is the jstack trace. Complete stack trace attached.

 Executor task launch worker-1 #58 daemon prio=5 os_prio=0
 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
  at java.lang.Thread.sleep(Native Method)
  at
 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152)
  - locked 0xf8cb7258 (a
 org.apache.hadoop.hbase.client.RpcRetryingCaller)
  at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705)
  at
 org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102)
  at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162)
  - locked 0xf84ac0b0 (a java.lang.Object)
  at
 

Join between Streaming data vs Historical Data in spark

2015-04-29 Thread Rendy Bambang Junior
Let say I have transaction data and visit data

visit
| userId | Visit source | Timestamp |
| A  | google ads   | 1 |
| A  | facebook ads | 2 |

transaction
| userId | total price | timestamp |
| A  | 100 | 248384|
| B  | 200 | 43298739  |

I want to join transaction data and visit data to do sales attribution. I
want to do it realtime whenever transaction occurs (streaming).

Is it scalable to do join between one data and very big historical data
using join function in spark? If it is not, then how it usually be done?

Visit needs to be historical, since visit can be anytime before transaction
(e.g. visit is one year before transaction occurs)

Rendy


Re: How to stream all data out of a Kafka topic once, then terminate job?

2015-04-29 Thread Cody Koeninger
The idea of peek vs poll doesn't apply to kafka, because kafka is not a
queue.

There are two ways of doing what you want, either using KafkaRDD or a
direct stream

The Kafka rdd approach would require you to find the beginning and ending
offsets for each partition.  For an example of this, see
getEarliestLeaderOffsets and getLatestLeaderOffsets in
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala

For usage examples see the tests.  That code isn't public so you'd need to
either duplicate it, or build a version of spark with all of the
'private[blah]' restrictions removed.

The direct stream approach would require setting the kafka parameter
auto.offset.reset to smallest, in order to start at the beginning.  If you
haven't set any rate limiting parameters, then the first batch will contain
all the messages.  You can then kill the job after the first batch.  It's
possible you may be able to kill the job from a
StreamingListener.onBatchCompleted, but I've never tried and don't know
what the consequences may be.

On Wed, Apr 29, 2015 at 8:52 AM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Part of the issues is, when you read messages in a topic, the messages are
 peeked, not polled, so there'll be no when the queue is empty, as I
 understand it.

 So it would seem I'd want to do KafkaUtils.createRDD, which takes an array
 of OffsetRange's. Each OffsetRange is characterized by topic, partition,
 fromOffset, and untilOffset. In my case, I want to read all data, i.e. from
 all partitions and I don't know how many partitions there may be, nor do I
 know the 'untilOffset' values.

 In essence, I just want something like createRDD(new OffsetRangeAllData());

 In addition, I'd ideally want the option of not peeking but polling the
 messages off the topics involved.  But I'm not sure whether Kafka API's
 support it and then whether Spark does/will support that as well...



 On Wed, Apr 29, 2015 at 1:52 AM, ayan guha guha.a...@gmail.com wrote:

 I guess what you mean is not streaming.  If you create a stream context
 at time t, you will receive data coming through starting time t++, not
 before time t.

 Looks like you want a queue. Let Kafka write to a queue, consume msgs
 from the queue and stop when queue is empty.
 On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote:

 Hi,

 I'm wondering about the use-case where you're not doing continuous,
 incremental streaming of data out of Kafka but rather want to publish
 data
 once with your Producer(s) and consume it once, in your Consumer, then
 terminate the consumer Spark job.

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(...));

 The batchDuration parameter is The time interval at which streaming data
 will be divided into batches. Can this be worked somehow to cause Spark
 Streaming to just get all the available data, then let all the RDD's
 within
 the Kafka discretized stream get processed, and then just be done and
 terminate, rather than wait another period and try and process any more
 data
 from Kafka?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to group multiple row data ?

2015-04-29 Thread ayan guha
looks like you need this:

lst = [[10001, 132, 2002, 1, 2012-11-23],
   [10001, 132, 2002, 1, 2012-11-24],
   [10031, 102, 223, 2, 2012-11-24],
   [10001, 132, 2002, 2, 2012-11-25],
   [10001, 132, 2002, 3, 2012-11-26]]
base = sc.parallelize(lst,1).map(lambda x:
Row(idx=x[0],num=x[1],yr=x[2],ev=x[3],dt=int(x[4].replace(-,
baseDF = ssc.createDataFrame(base)
print baseDF.printSchema()

baseDF.registerTempTable(base)
trm = ssc.sql(select a.idx,a.num,a.yr,b.ev,a.dt from base a inner join
base b on a.idx=b.idx and a.num=b.num and a.yr=b.yr where a.dt=b.dt  order
by a.idx,a.num,a.yr,a.dt)
trmRDD = trm.map(rowtoarr).reduceByKey(lambda x,y: str(x)+,+str(y))
for i in trmRDD.collect():
print i

def rowtoarr(r):
return (r.idx,r.num,r.yr,r.dt),r.ev

((10031, 102, 223, 20121124), 2)
((10001, 132, 2002, 20121123), 1)
((10001, 132, 2002, 20121125), '1,1,2')
((10001, 132, 2002, 20121124), '1,1')
((10001, 132, 2002, 20121126), '1,1,2,3')

On Wed, Apr 29, 2015 at 10:34 PM, Manoj Awasthi awasthi.ma...@gmail.com
wrote:

 Sorry but I didn't fully understand the grouping. This line:

  The group must only take the closest previous trigger. The first one
 hence shows alone.

 Can you please explain further?


 On Wed, Apr 29, 2015 at 4:42 PM, bipin bipin@gmail.com wrote:

 Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event,
 CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and
 CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet
 out
 of them such that I can infer that Customer registered event from 1to 2
 and
 if present to 3 timewise and preserving the number of entries. For e.g.

 Before processing:
 10001, 132, 2002, 1, 2012-11-23
 10001, 132, 2002, 1, 2012-11-24
 10031, 102, 223, 2, 2012-11-24
 10001, 132, 2002, 2, 2012-11-25
 10001, 132, 2002, 3, 2012-11-26
 (total 5 rows)

 After processing:
 10001, 132, 2002, 2012-11-23, 1
 10031, 102, 223, 2012-11-24, 2
 10001, 132, 2002, 2012-11-24, 1,2,3
 (total 5 in last field - comma separated!)

 The group must only take the closest previous trigger. The first one hence
 shows alone. Can this be done using spark sql ? If it needs to processed
 in
 functionally in scala, how to do this. I can't wrap my head around this.
 Can
 anyone help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Best Regards,
Ayan Guha


RE: How to group multiple row data ?

2015-04-29 Thread Silvio Fiorito
I think you'd probably want to look at combineByKey. I'm on my phone so can't 
give you an example, but that's one solution i would try.

You would then take the resulting RDD and go back to a DF if needed.

From: bipinmailto:bipin@gmail.com
Sent: ‎4/‎29/‎2015 4:13 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to group multiple row data ?

Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event,
CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and
CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet out
of them such that I can infer that Customer registered event from 1to 2 and
if present to 3 timewise and preserving the number of entries. For e.g.

Before processing:
10001, 132, 2002, 1, 2012-11-23
10001, 132, 2002, 1, 2012-11-24
10031, 102, 223, 2, 2012-11-24
10001, 132, 2002, 2, 2012-11-25
10001, 132, 2002, 3, 2012-11-26
(total 5 rows)

After processing:
10001, 132, 2002, 2012-11-23, 1
10031, 102, 223, 2012-11-24, 2
10001, 132, 2002, 2012-11-24, 1,2,3
(total 5 in last field - comma separated!)

The group must only take the closest previous trigger. The first one hence
shows alone. Can this be done using spark sql ? If it needs to processed in
functionally in scala, how to do this. I can't wrap my head around this. Can
anyone help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: solr in spark

2015-04-29 Thread Costin Leau

# disclaimer I'm an employee of Elastic (the company behind Elasticsearch) and 
lead of Elasticsearch Hadoop integration

Some things to clarify on the Elasticsearch side:

1. Elasticsearch is a distributed, real-time search and analytics engine. Search is just one aspect of it and it can 
work with any type of data (whether it's text, image encoding, etc...): Github, Wikipedia, Stackoverflow are popular 
examples of known websites that are powered by Elasticsearch. In fact you can find plenty of use cases and information 
about this on the website [1].


2. Elasticsearch is stand-alone and can be run on the same or separate machines as other services. In fact, on the 
_same_ machine, one can run _multiple_ Elasticsearch nodes (and thus clusters). For best performance, having dedicated 
hardware (as Nick suggested) works best.


3. The Elasticsearch Spark integration has been available for over a year through Map/Reduce and the native (Scala and 
Java) API since q3 last year. There are plenty of features available which are fully documented here [2]. Better yet, 
there's a talk by yours truly from Spark Summit East [3] that is fully focused on exactly this topic.


4. elasticsearch-hadoop is certified by Databricks, Cloudera, Hortonworks and MapR and supports both Spark core and 
Spark SQL 1.0-1.3. There are binaries for Scala 2.10 and 2.11. And for what it's worth, it provided on of the first (if 
not the first) implementation of DataSource API outside Databricks, which means not only using Elasticsearch in 
declarative fasion but also having push-down support for operators.


Hopefully these materials will get you started with Spark and Elasticsearch and also clarify some of the misconceptions 
about Elasticsearch.


Cheers,

[1] https://www.elastic.co/products/elasticsearch
[2] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/reference.html
[3] 
http://spark-summit.org/east/2015/talk/using-spark-and-elasticsearch-for-real-time-data-analysis


On 4/28/15 8:16 PM, Nick Pentreath wrote:

Depends on your use case and search volume. Typically you'd have a dedicated ES 
cluster if your app is doing a lot of
real time indexing and search.

If it's only for spark integration then you could colocate ES and spark

—
Sent from Mailbox https://www.dropbox.com/mailbox


On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com 
mailto:gangele...@gmail.com wrote:

Thanks for reply.

Elastic search index will be within my Cluster? or I need the separate host 
the elastic search?


On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com 
mailto:nick.pentre...@gmail.com wrote:

I haven't used Solr for a long time, and haven't used Solr in Spark.

However, why do you say Elasticsearch is not a good option ...? ES 
absolutely supports full-text search and
not just filtering and grouping (in fact it's original purpose was and 
still is text search, though filtering,
grouping and aggregation are heavily used).

http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com 
mailto:gangele...@gmail.com wrote:

Does anyone tried using solr inside spark?
below is the project describing it.
https://github.com/LucidWorks/spark-solr.

I have a requirement in which I want to index 20 millions companies 
name and then search as and when new
data comes in. the output should be list of companies matching the 
query.

Spark has inbuilt elastic search but for this purpose Elastic 
search is not a good option since this is
totally text search problem?

Elastic search is good  for filtering and grouping.

Does any body used solr inside spark?

Regards
jeetendra






--
Costin


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataframe filter based on another Dataframe

2015-04-29 Thread Olivier Girardot
You mean after joining ? Sure, my question was more if there was any best
practice preferred to joining the other dataframe for filtering.

Regards,

Olivier.

Le mer. 29 avr. 2015 à 13:23, Olivier Girardot ssab...@gmail.com a écrit :

 Hi everyone,
 what is the most efficient way to filter a DataFrame on a column from
 another Dataframe's column. The best idea I had, was to join the two
 dataframes :

  val df1 : Dataframe
  val df2: Dataframe
  df1.join(df2, df1(id) === df2(id), inner)

 But I end up (obviously) with the id column twice.
 Another approach would be to filter df1 but I can't seem to get this to
 work using df2's column as a base

 Any idea ?

 Regards,

 Olivier.



Re: Re: implicit function in SparkStreaming

2015-04-29 Thread Tathagata Das
Could you put the implicit def in an object? That should work, as objects
are never serialized.


On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Thank you for your pointers , it`s very helpful to me , in this scenario
 how can i use the implicit def in the enclosing class ?



 *From:* Tathagata Das t...@databricks.com
 *Date:* 2015-04-30 07:00
 *To:* guoqing0...@yahoo.com.hk
 *CC:* user user@spark.apache.org
 *Subject:* Re: implicit function in SparkStreaming
 I believe that the implicit def is pulling in the enclosing class (in
 which the def is defined) in the closure which is not serializable.


 On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
 guoqing0...@yahoo.com.hk wrote:

 Hi guys,
 I`m puzzled why i cant use the implicit function in spark streaming to
 cause Task not serializable .

 code snippet:

 implicit final def str2KeyValue(s:String): (String,String) = {
   val message = s.split(\\|)
   if(message.length = 2)
 (message(0),message(1))
   else if(message.length == 1) {
 (message(0), )
   }
   else
 (,)
 }

 def filter(stream:DStream[String]) :DStream[String] = {
   stream.filter(s = {
 (s._1==Action  s._2==TRUE)
   })


 Could you please give me some pointers ? Thank you .

 --
 guoqing0...@yahoo.com.hk





Re: [Spark SQL] Problems creating a table in specified schema/database

2015-04-29 Thread Michael Armbrust
No, sorry this is not supported.  Support for more than one database is
lacking in several areas (though mostly works for hive tables).  I'd like
to fix this in Spark 1.5.

On Tue, Apr 28, 2015 at 1:54 AM, James Aley james.a...@swiftkey.com wrote:

 Hey all,

 I'm trying to create tables from existing Parquet data in different
 schemata. The following isn't working for me:

 CREATE DATABASE foo;

 CREATE TABLE foo.bar
 USING com.databricks.spark.avro
 OPTIONS (path '...');

 -- Error: org.apache.spark.sql.AnalysisException: cannot recognize input
 near 'USING' 'com' '.' in table name; line 1 pos 13 (state=,code=0)

 I also tried

 USE foo;
 CREATE TABLE bar
 USING com.databricks.spark.avro
 OPTIONS (path '...');

 -- Creates the table successfully, but in the default.* schema.


 This is on Spark 1.3.1, running on YARN, Hive 0.13.1. Any suggestions?
 Should this work?


 James.



RE: Sort (order by) of the big dataset

2015-04-29 Thread Ulanov, Alexander
After day of debugging (actually, more), I can answer my question:
The problem is that the default value 200 of spark.sql.shuffle.partitions is 
too small for sorting 2B rows. It was hard to realize because Spark executors 
just crash with various exceptions one by one. The other takeaway is that 
Dataframe order by and RDD.sortBy are implemented in different ways. BTW., 
why?

Small synthetic test (copied from my blog):
Create 2B rows of MyRecord within 2000 partitions, so each partition will have 
1M of rows.
import sqlContext.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class MyRecord(time: Double, id: String)
val rdd = sc.parallelize(1 to 200, 200).flatMap(x = 
Seq.fill(1000)(MyRecord(util.Random.nextDouble, xxx)))

Lets sort this RDD by time:
val sorted = rdd.sortBy(x = x.time)
result.count

It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. You 
can also check tasks that were completed in Spark web UI. The number of 
reducers was equal to the number of partitions, i.e. 2000

Lets convert the original RDD to Dataframe and sort again:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable(data)
val result = sqlContext.sql(select * from data order by time)
result.count

It will run for a while and then crash. If you check tasks in the Spark Web UI, 
you will see that some of them were cancelled due to lost executors 
(ExecutorLost) due to some strange Exceptions. It is really hard to trace back 
which executor was first to be lost. The other follow it as in house of cards. 
What's the problem? The number of reducers. For the first task it is equal to 
the number of partitions, i.e. 2000, but for the second it switched to 200.

From: Ulanov, Alexander
Sent: Wednesday, April 29, 2015 1:08 PM
To: user@spark.apache.org
Subject: Sort (order by) of the big dataset

Hi,

I have a 2 billion records dataset witch schema eventId: String, time: Double, 
value: Double. It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 
1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 
workers with 3GB memory.

I keep failing to sort the mentioned dataset in Spark. I do the following:
val pf = sqlContext.parquetFile(hdfs://my.net/data.parquet)
pf.registerTempTable(data)
val sorted = sqlContext.sql(select * from data order by time)
sorted.saveAsParquetFile(hdfs://my.net/data-sorted.parquet)

Spark starts to execute tasks and then errors like Exector Lost pop up in the 
web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), 
giving different types of Exceptions in explanation. My thinking is that the 
main problem is with GC overhead limit exception however I observe exceptions 
related to connection time out and shuffling write 
(java.lang.IllegalStateException: Shutdown in progress; 
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ).

What I tried:
1)Tried to order by eventId and by both order by eventId, time with the 
same result.
2)Looked at the shuffle parameters but the default do make sense.
3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
pf.repartition(3000) in order to get smaller chunks of data passed to executors 
(originally there are 300 partitions). It did not help either. Surprisingly 
this dataset takes 50GB on hdfs versus 23GB that took the original.
4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not 
help.

Could you suggest what might be the problem and what is the workaround? Just in 
case, I cannot have more RAM or more machines :)

Best  regards, Alexander


Re: Compute pairwise distance

2015-04-29 Thread Debasish Das
Cross Join shuffle space might not be needed since most likely through
application specific logic (topK etc) you can cut the shuffle space...Also
most likely the brute force approach will be a benchmark tool to see how
better is your clustering based KNN solution since there are several ways
you can find approximate nearest neighbors for your application
(KMeans/KDTree/LSH etc)...

There is a variant that I will bring as a PR for this JIRA and we will of
course look into how to improve it further...the idea is to think about
distributed matrix multiply where both matrices A and B are distributed and
master coordinates pulling a partition of A and multiply it with B...

The idea suffices for kernel matrix generation as well if the number of
rows are modest (~10M or so)...

https://issues.apache.org/jira/browse/SPARK-4823


On Wed, Apr 29, 2015 at 3:25 PM, ayan guha guha.a...@gmail.com wrote:

 This is my first thought, please suggest any further improvement:
 1. Create a rdd of your dataset
 2. Do an cross join to generate pairs
 3. Apply reducebykey and compute distance. You will get a rdd with
 keypairs and distance

 Best
 Ayan
 On 30 Apr 2015 06:11, Driesprong, Fokko fo...@driesprong.frl wrote:

 Dear Sparkers,

 I am working on an algorithm which requires the pair distance between all
 points (eg. DBScan, LOF, etc.). Computing this for *n* points will
 require produce a n^2 matrix. If the distance measure is symmetrical, this
 can be reduced to (n^2)/2. What would be the most optimal way of computing
 this?

 The paper *Pairwise Element Computation with MapReduce
 https://www.cs.ucsb.edu/~ravenben/classes/290F/papers/kvl10.pdf* paper
 describes different approaches to optimize this process within a map-reduce
 model. Although I don't believe this is applicable to Spark. How would you
 guys approach this?

 I first thought about broadcasting the original points to all the
 workers, and then compute the distances across the different workers.
 Although this requires all the points to be distributed across all the
 machines. But this feels rather brute-force, what do you guys think.

 I don't expect full solutions, but some pointers would be great. I think
 a good solution can be re-used for many algorithms.

 Kind regards,
 Fokko Driesprong




Re: Re: implicit function in SparkStreaming

2015-04-29 Thread guoqing0...@yahoo.com.hk
Thank you for your pointers , it`s very helpful to me , in this scenario how 
can i use the implicit def in the enclosing class ?


 
From: Tathagata Das
Date: 2015-04-30 07:00
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: implicit function in SparkStreaming
I believe that the implicit def is pulling in the enclosing class (in which the 
def is defined) in the closure which is not serializable. 


On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:
Hi guys,
I`m puzzled why i cant use the implicit function in spark streaming to cause 
Task not serializable . 

code snippet:
implicit final def str2KeyValue(s:String): (String,String) = {
  val message = s.split(\\|)
  if(message.length = 2)
(message(0),message(1))
  else if(message.length == 1) {
(message(0), )
  }
  else
(,)
}

def filter(stream:DStream[String]) :DStream[String] = {
  stream.filter(s = {
(s._1==Action  s._2==TRUE)
  })
Could you please give me some pointers ? Thank you .



guoqing0...@yahoo.com.hk



spark kryo serialization question

2015-04-29 Thread 邓刚 [技术中心]
Hi all
 We know that spark support Kryo serialization, suppose there is a map 
function which map C  to K,V(here C,K,V are instance of class C,K,V), when we 
register kryo serialization, should I register all of these three class?

Best Wishes

Triones Deng


本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.


Re: How to install spark in spark on yarn mode

2015-04-29 Thread madhvi

Hi,
Follow the instructions to install on the following link:
http://mbonaci.github.io/mbo-spark/
You dont need to install spark on every node.Just install it on one node 
or you can install it on remote system also and made a spark cluster.

Thanks
Madhvi
On Thursday 30 April 2015 09:31 AM, xiaohe lan wrote:

Hi experts,

I see spark on yarn has yarn-client and yarn-cluster mode. I also have 
a 5 nodes hadoop cluster (hadoop 2.4). How to install spark if I want 
to try the spark on yarn mode.


Do I need to install spark on the each node of hadoop cluster ?

Thanks,
Xiaohe



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: implicit function in SparkStreaming

2015-04-29 Thread Lin Hao Xu

For you question, I think the discussion in this link can help.

http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-td6801.html

Best regards,

Lin Hao XU
IBM Research China
Email: xulin...@cn.ibm.com
My Flickr: http://www.flickr.com/photos/xulinhao/sets



From:   guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk
To: Tathagata Das t...@databricks.com
Cc: user user@spark.apache.org
Date:   2015/04/30 11:17
Subject:Re: Re: implicit function in SparkStreaming



Appreciate for your help , it works . i`m curious why the enclosing class
cannot serialized , is it need to extends  java.io.Serializable ? if object
never serialized how it works in the task .whether there`s any association
with the spark.closure.serializer .


 guoqing0...@yahoo.com.hk

 From: Tathagata Das
 Date: 2015-04-30 09:30
 To: guoqing0...@yahoo.com.hk
 CC: user
 Subject: Re: Re: implicit function in SparkStreaming
 Could you put the implicit def in an object? That should work, as objects
 are never serialized.


 On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk 
 guoqing0...@yahoo.com.hk wrote:
   Thank you for your pointers , it`s very helpful to me , in this scenario
   how can i use the implicit def in the enclosing class ?



From: Tathagata Das
Date: 2015-04-30 07:00
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: implicit function in SparkStreaming
I believe that the implicit def is pulling in the enclosing class (in
which the def is defined) in the closure which is not serializable.


On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:
 Hi guys,
 I`m puzzled why i cant use the implicit function in spark streaming to
 cause Task not serializable .

 code snippet:
 implicit final def str2KeyValue(s:String): (String,String) = {
   val message = s.split(\\|)
   if(message.length = 2)
 (message(0),message(1))
   else if(message.length == 1) {
 (message(0), )
   }
   else
 (,)
 }

 def filter(stream:DStream[String]) :DStream[String] = {
   stream.filter(s = {
 (s._1==Action  s._2==TRUE)
   })

 Could you please give me some pointers ? Thank you .


  guoqing0...@yahoo.com.hk



Event generator for SPARK-Streaming from csv

2015-04-29 Thread anshu shukla
I have the real DEBS-TAxi data in csv file , in order to operate over it
how to simulate a Spout kind  of thing as event generator using the
timestamps in CSV file.

-- 
SERC-IISC
Thanks  Regards,
Anshu Shukla


RE: HOw can I merge multiple DataFrame and remove duplicated key

2015-04-29 Thread Wang, Ningjun (LNG-NPV)
As I understand from SQL, group by allow you to do sum(), average(), max(), 
mn(). But how do I select the entire row in the group with maximum column 
timeStamp? For example

id1,  value1, 2015-01-01
id1, value2,  2015-01-02
id2,  value3, 2015-01-01
id2, value4,  2015-01-02

I want to return
id1, value2,  2015-01-02
id2, value4,  2015-01-02

I can use reduceByKey() in RDD but how to do it using DataFrame? Can you give 
an example code snipet?

Thanks
Ningjun

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Wednesday, April 29, 2015 5:54 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: HOw can I merge multiple DataFrame and remove duplicated key


Its no different, you would use group by and aggregate function to do so.
On 30 Apr 2015 02:15, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote:
I have multiple DataFrame objects each stored in a parquet file.  The DataFrame 
just contains 3 columns (id,  value,  timeStamp). I need to union all the 
DataFrame objects together but for duplicated id only keep the record with the 
latest timestamp. How can I  do that?

I can do this for RDDs by sc.union() to union all the RDDs and then do a 
reduceByKey() to remove duplicated id by keeping only the one with latest 
timeStamp field. But how do I do it for DataFrame?


Ningjun



Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Cody Koeninger
Use lsof to see what files are actually being held open.

That stacktrace looks to me like it's from the driver, not executors.
Where in foreach is it being called?  The outermost portion of foreachRDD
runs in the driver, the innermost portion runs in the executors.  From the
docs:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

dstream.foreachRDD { rdd =
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =
connection.send(record) // executed at the worker
  }}


@td I've specifically looked at kafka socket connections for the standard
1.3 code vs my branch that has cached connections.  The standard
non-caching code has very short lived connections.  I've had jobs running
for a month at a time, including ones writing to mysql.  Not saying it's
impossible, but I'd think we need some evidence before speculating this has
anything to do with it.


On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 This function is called in foreachRDD. I think it should be running in the
 executors. I add the statement.close() in the code and it is running. I
 will let you know if this fixes the issue.



 On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(),
 the problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or
 the sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka
 in the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark
 log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 

How to install spark in spark on yarn mode

2015-04-29 Thread xiaohe lan
Hi experts,

I see spark on yarn has yarn-client and yarn-cluster mode. I also have a 5
nodes hadoop cluster (hadoop 2.4). How to install spark if I want to try
the spark on yarn mode.

Do I need to install spark on the each node of hadoop cluster ?

Thanks,
Xiaohe


Re: Re: implicit function in SparkStreaming

2015-04-29 Thread guoqing0...@yahoo.com.hk
Appreciate for your help , it works . i`m curious why the enclosing class 
cannot serialized , is it need to extends  java.io.Serializable ? if object 
never serialized how it works in the task .whether there`s any association with 
the spark.closure.serializer .



guoqing0...@yahoo.com.hk
 
From: Tathagata Das
Date: 2015-04-30 09:30
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: Re: implicit function in SparkStreaming
Could you put the implicit def in an object? That should work, as objects are 
never serialized.


On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:
Thank you for your pointers , it`s very helpful to me , in this scenario how 
can i use the implicit def in the enclosing class ?


 
From: Tathagata Das
Date: 2015-04-30 07:00
To: guoqing0...@yahoo.com.hk
CC: user
Subject: Re: implicit function in SparkStreaming
I believe that the implicit def is pulling in the enclosing class (in which the 
def is defined) in the closure which is not serializable. 


On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:
Hi guys,
I`m puzzled why i cant use the implicit function in spark streaming to cause 
Task not serializable . 

code snippet:
implicit final def str2KeyValue(s:String): (String,String) = {
  val message = s.split(\\|)
  if(message.length = 2)
(message(0),message(1))
  else if(message.length == 1) {
(message(0), )
  }
  else
(,)
}

def filter(stream:DStream[String]) :DStream[String] = {
  stream.filter(s = {
(s._1==Action  s._2==TRUE)
  })
Could you please give me some pointers ? Thank you .



guoqing0...@yahoo.com.hk




Re: How to stream all data out of a Kafka topic once, then terminate job?

2015-04-29 Thread Dmitry Goldenberg
Thanks for the comments, Cody.

Granted, Kafka topics aren't queues.  I was merely wishing that Kafka's
topics had some queue behaviors supported because often that is exactly
what one wants. The ability to poll messages off a topic seems like what
lots of use-cases would want.

I'll explore both of these approaches you mentioned.

For now, I see that using the KafkaRDD approach means finding partitions
and offsets. My thinking was that it'd be nice if there was a convenience
in the API that would wrap this logic and expose it as a method.  For the
second approach, I'll need to see where the listener is grafted on and
whether it would have enough ability to kill the whole job.  There's the
stop method on the context so perhaps if the listener could grab hold of
the context it'd invoke stop() on it.


On Wed, Apr 29, 2015 at 10:26 AM, Cody Koeninger c...@koeninger.org wrote:

 The idea of peek vs poll doesn't apply to kafka, because kafka is not a
 queue.

 There are two ways of doing what you want, either using KafkaRDD or a
 direct stream

 The Kafka rdd approach would require you to find the beginning and ending
 offsets for each partition.  For an example of this, see
 getEarliestLeaderOffsets and getLatestLeaderOffsets in
 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala

 For usage examples see the tests.  That code isn't public so you'd need to
 either duplicate it, or build a version of spark with all of the
 'private[blah]' restrictions removed.

 The direct stream approach would require setting the kafka parameter
 auto.offset.reset to smallest, in order to start at the beginning.  If you
 haven't set any rate limiting parameters, then the first batch will contain
 all the messages.  You can then kill the job after the first batch.  It's
 possible you may be able to kill the job from a
 StreamingListener.onBatchCompleted, but I've never tried and don't know
 what the consequences may be.

 On Wed, Apr 29, 2015 at 8:52 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Part of the issues is, when you read messages in a topic, the messages
 are peeked, not polled, so there'll be no when the queue is empty, as I
 understand it.

 So it would seem I'd want to do KafkaUtils.createRDD, which takes an
 array of OffsetRange's. Each OffsetRange is characterized by topic,
 partition, fromOffset, and untilOffset. In my case, I want to read all
 data, i.e. from all partitions and I don't know how many partitions there
 may be, nor do I know the 'untilOffset' values.

 In essence, I just want something like createRDD(new
 OffsetRangeAllData());

 In addition, I'd ideally want the option of not peeking but polling the
 messages off the topics involved.  But I'm not sure whether Kafka API's
 support it and then whether Spark does/will support that as well...



 On Wed, Apr 29, 2015 at 1:52 AM, ayan guha guha.a...@gmail.com wrote:

 I guess what you mean is not streaming.  If you create a stream context
 at time t, you will receive data coming through starting time t++, not
 before time t.

 Looks like you want a queue. Let Kafka write to a queue, consume msgs
 from the queue and stop when queue is empty.
 On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote:

 Hi,

 I'm wondering about the use-case where you're not doing continuous,
 incremental streaming of data out of Kafka but rather want to publish
 data
 once with your Producer(s) and consume it once, in your Consumer, then
 terminate the consumer Spark job.

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(...));

 The batchDuration parameter is The time interval at which streaming
 data
 will be divided into batches. Can this be worked somehow to cause Spark
 Streaming to just get all the available data, then let all the RDD's
 within
 the Kafka discretized stream get processed, and then just be done and
 terminate, rather than wait another period and try and process any more
 data
 from Kafka?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: solr in spark

2015-04-29 Thread Costin Leau

On 4/29/15 6:02 PM, Jeetendra Gangele wrote:

Thanks for detail explanation. My only worry is to search the all combinations 
of company names through ES looks hard.



I'm not sure what makes you think ES looks hard. Have you tried browsing the Elasticsearch reference or the definitive 
guide?


[1] http://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
[2] http://www.elastic.co/guide/en/elasticsearch/guide/current/index.html


in solr we define everything in xml files like all attributes in 
WordDocumentFilterFactory and shingles factory. how to
do this in elastic search?



See the links above, the IRC or the mailing list. I don't want to derail this thread any longer so I'll wrap up by 
pointing to one

of the many resources that pop up on google - a blog post on shingles and a 
post from Found.no on text analysis and shingles

https://www.elastic.co/blog/searching-with-shingles
https://www.found.no/foundation/text-analysis-part-1/#optimizing-phrase-searches-with-shingles

If you need more help, do reach out to the Elasticsearch mailing list:
https://www.elastic.co/community

Cheers,




On 29 April 2015 at 20:03, Costin Leau costin.l...@gmail.com 
mailto:costin.l...@gmail.com wrote:

# disclaimer I'm an employee of Elastic (the company behind Elasticsearch) 
and lead of Elasticsearch Hadoop integration

Some things to clarify on the Elasticsearch side:

1. Elasticsearch is a distributed, real-time search and analytics engine. 
Search is just one aspect of it and it can
work with any type of data (whether it's text, image encoding, etc...): 
Github, Wikipedia, Stackoverflow are popular
examples of known websites that are powered by Elasticsearch. In fact you 
can find plenty of use cases and
information about this on the website [1].

2. Elasticsearch is stand-alone and can be run on the same or separate 
machines as other services. In fact, on the
_same_ machine, one can run _multiple_ Elasticsearch nodes (and thus 
clusters). For best performance, having
dedicated hardware (as Nick suggested) works best.

3. The Elasticsearch Spark integration has been available for over a year 
through Map/Reduce and the native (Scala
and Java) API since q3 last year. There are plenty of features available 
which are fully documented here [2]. Better
yet, there's a talk by yours truly from Spark Summit East [3] that is fully 
focused on exactly this topic.

4. elasticsearch-hadoop is certified by Databricks, Cloudera, Hortonworks 
and MapR and supports both Spark core and
Spark SQL 1.0-1.3. There are binaries for Scala 2.10 and 2.11. And for what 
it's worth, it provided on of the first
(if not the first) implementation of DataSource API outside Databricks, 
which means not only using Elasticsearch in
declarative fasion but also having push-down support for operators.

Hopefully these materials will get you started with Spark and Elasticsearch 
and also clarify some of the
misconceptions about Elasticsearch.

Cheers,

[1] https://www.elastic.co/products/elasticsearch
[2] 
http://www.elastic.co/guide/en/elasticsearch/hadoop/master/reference.html
[3] 
http://spark-summit.org/east/2015/talk/using-spark-and-elasticsearch-for-real-time-data-analysis


On 4/28/15 8:16 PM, Nick Pentreath wrote:

Depends on your use case and search volume. Typically you'd have a 
dedicated ES cluster if your app is doing a
lot of
real time indexing and search.

If it's only for spark integration then you could colocate ES and spark

—
Sent from Mailbox https://www.dropbox.com/mailbox


On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com 
mailto:gangele...@gmail.com
mailto:gangele...@gmail.com mailto:gangele...@gmail.com wrote:

 Thanks for reply.

 Elastic search index will be within my Cluster? or I need the 
separate host the elastic search?


 On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com 
mailto:nick.pentre...@gmail.com
mailto:nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com 
wrote:

 I haven't used Solr for a long time, and haven't used Solr in 
Spark.

 However, why do you say Elasticsearch is not a good option 
...? ES absolutely supports full-text
search and
 not just filtering and grouping (in fact it's original purpose 
was and still is text search, though
filtering,
 grouping and aggregation are heavily used).

http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



 On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele 
gangele...@gmail.com mailto:gangele...@gmail.com
mailto:gangele...@gmail.com mailto:gangele...@gmail.com wrote:

 Does anyone tried using solr inside spark?
 below is the