Re: Optimizing reduce for 'huge' aggregated outputs.

2014-06-10 Thread Nick Pentreath
Can you key your RDD by some key and use reduceByKey? In fact if you are 
merging bunch of maps you can create a set of (k, v) in your mapPartitions and 
then reduceByKey using some merge function. The reduce will happen in parallel 
on multiple nodes in this case. You'll end up with just a single set of k, v 
per partition which you can reduce or collect and merge on the driver.




—
Sent from Mailbox

On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I suppose what I want is the memory efficiency of toLocalIterator and the
 speed of collect. Is there any such thing?
 On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:
 Hello,

 I noticed that the final reduce function happens in the driver node with a
 code that looks like the following.

 val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) {
  a.merge(b)
 }

 although individual outputs from mappers are small. Over time the
 aggregated result outputMap could be huuuge (say with hundreds of millions
 of keys and values, reaching giga bytes).

 I noticed that, even if we have a lot of memory in the driver node, this
 process becomes realy slow eventually (say we have 100+ partitions. the
 first reduce is fast, but progressively, it becomes veeery slow as more and
 more partition outputs get aggregated). Is this because the intermediate
 reduce output gets serialized and then deserialized every time?

 What I'd like ideally is, since reduce is taking place in the same machine
 any way, there's no need for any serialization and deserialization, and
 just aggregate the incoming results into the final aggregation. Is this
 possible?


Re: Optimizing reduce for 'huge' aggregated outputs.

2014-06-10 Thread DB Tsai
Hi Nick,

How does reduce work? I thought after reducing in the executor, it
will reduce in parallel between multiple executors instead of pulling
everything to driver and reducing there.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Mon, Jun 9, 2014 at 11:07 PM, Nick Pentreath
nick.pentre...@gmail.com wrote:
 Can you key your RDD by some key and use reduceByKey? In fact if you are
 merging bunch of maps you can create a set of (k, v) in your mapPartitions
 and then reduceByKey using some merge function. The reduce will happen in
 parallel on multiple nodes in this case. You'll end up with just a single
 set of k, v per partition which you can reduce or collect and merge on the
 driver.


 —
 Sent from Mailbox


 On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:

 I suppose what I want is the memory efficiency of toLocalIterator and the
 speed of collect. Is there any such thing?


 On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:

 Hello,

 I noticed that the final reduce function happens in the driver node with
 a code that looks like the following.

 val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) {
  a.merge(b)
 }

 although individual outputs from mappers are small. Over time the
 aggregated result outputMap could be huuuge (say with hundreds of millions
 of keys and values, reaching giga bytes).

 I noticed that, even if we have a lot of memory in the driver node, this
 process becomes realy slow eventually (say we have 100+ partitions. the
 first reduce is fast, but progressively, it becomes veeery slow as more and
 more partition outputs get aggregated). Is this because the intermediate
 reduce output gets serialized and then deserialized every time?

 What I'd like ideally is, since reduce is taking place in the same
 machine any way, there's no need for any serialization and deserialization,
 and just aggregate the incoming results into the final aggregation. Is this
 possible?





Writing data to HBase using Spark

2014-06-10 Thread Vibhor Banga
Hi,

I am reading data from a HBase table to RDD and then using foreach on that
RDD I am doing some processing on every Result of HBase table. After this
processing I want to store the processed data back to another HBase table.

How can I do that ? If I use standard Hadoop and HBase classes to write
data to HBase I fall into serialization issues.

How should I write data to HBase in this case?

Thanks,
-Vibhor


Re: Writing data to HBase using Spark

2014-06-10 Thread Kanwaldeep
Please see sample code attached at
https://issues.apache.org/jira/browse/SPARK-944. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Shark over Spark-Streaming

2014-06-10 Thread praveshjain1991
Is it possible to use Shark over Streaming data? 
I did not find any mention of that on the website. When you run shark it
gives you a shell to run your queries for stored data. Is there any way to
do the same over streaming data?

--
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shark-over-Spark-Streaming-tp7307.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?

2014-06-10 Thread Cheng, Hao
And if you want to use the SQL CLI (based on catalyst) as it works in Shark, 
you can also check out https://github.com/amplab/shark/pull/337 :)

This preview version doesn’t require the Hive to be setup in the cluster. 
(Don’t forget to put the hive-site.xml under SHARK_HOME/conf also)

Cheng Hao

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Saturday, June 07, 2014 2:22 AM
To: user@spark.apache.org
Subject: Re: Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?

There is not an official updated version of Shark for Spark-1.0 (though you 
might check out the untested spark-1.0 branch on the github).

You can also check out the preview release of Shark that runs on Spark SQL: 
https://github.com/amplab/shark/tree/sparkSql

Michael

On Fri, Jun 6, 2014 at 6:02 AM, bijoy deb 
bijoy.comput...@gmail.commailto:bijoy.comput...@gmail.com wrote:
Hi,

I am trying to run build Shark-0.9.1 from source,with Spark-1.0.0 as its 
dependency,using sbt package command.But I am getting the below error during 
build,which is making me think that perhaps Spark-1.0.0 is not compatible with 
Shark-0.9.1:

[info]   Compilation completed in 9.046 s
[error] /vol1/shark/src/main/scala/shark/api/JavaTableRDD.scala:57: 
org.apache.spark.api.java.function.Function[shark.api.Row,Boolean] does not 
take parameters
[error] wrapRDD(rdd.filter((x = f(x).booleanValue(
[error]   ^
[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:84: type 
mismatch;
[error]  found   : String
[error]  required: org.apache.spark.serializer.Serializer
[error] new ShuffleDependency[Any, Any](rdd, part, 
SharkEnv.shuffleSerializerName)
[error] ^
[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:120: 
value serializerManager is not a member of org.apache.spark.SparkEnv
[error] val serializer = 
SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName, 
SparkEnv.get.conf)
[error]   ^
[warn] /vol1/shark/src/main/scala/shark/execution/ExtractOperator.scala:111: 
non-variable type argument (shark.execution.ReduceKey, Any) in type pattern 
org.apache.spark.rdd.RDD[(shark.execution.ReduceKey, Any)] is unchecked since 
it is eliminated by erasure
[warn]   case r: RDD[(ReduceKey, Any)] = RDDUtils.sortByKey(r)
[warn]   ^
[error] 
/vol1/shark/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala:204:
 type mismatch;
[error]  found   : String
[error]  required: org.apache.spark.serializer.Serializer
[error]   .setSerializer(SharkEnv.shuffleSerializerName)
[error]   ^
.
...
Can you please suggest if there is any way to use the Shark with the new 
Spark-1.0.0 version?
Thanks
Bijoy



Re: Spark 1.0.0 Maven dependencies problems.

2014-06-10 Thread toivoa
Thanks for the hint.

I removed signature info from same jar and JVM is happy now.

But problem remains, several same jar's but different versions, not good.

Spark itself is very, very promising, I am very excited


Thank you all
toivo



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-Maven-dependencies-problems-tp7247p7309.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Problem in Spark Streaming

2014-06-10 Thread nilmish
I am running a spark streaming job to count top 10 hashtags over last 5 mins
window, querying every 1 sec. 

It is taking approx 1.4 sec (end-to-end-delay) to answer most of the query
but there are few instances in between when it takes considerable more
amount of time (like around 15 sec) due to which the response time of
further queries also becomes more.  I am not able to debug the reason for
such spikes in between.  The data rate is nearly constant, so this spike is
not due to sudden increase in the data rate. 

Also is there any way so that I can fix a bound on time taken by a
particular query. Like if a particular query takes more than say 2 sec then
it should kill that query and move on to the next query. So that if a
particular query takes more time then it do not effect future queries.

Thanx,
Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-10 Thread Yingjun Wu
Hi Nilmish,

I confront the same problem. I am wondering how do you measure the latency?

Regards,
Yingjun



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-10 Thread nilmish
You can measure the latency from the logs. Search for words like Total delay
in the logs. This denotes the total end to end delay for a particular query.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7312.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


pmml with augustus

2014-06-10 Thread filipus
hello guys,

has anybody experiances with the library augustus as a serializer for
scoring models?

looks very promising and i even found a hint on the connection augustus and
spark

all the best



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pmml with augustus

2014-06-10 Thread Sean Owen
It's worth mentioning that Augustus is a Python-based library. On a
related note, in Java-land, I have had good experiences with jpmml's
projects:


On Tue, Jun 10, 2014 at 7:52 AM, filipus floe...@gmail.com wrote:
 hello guys,

 has anybody experiances with the library augustus as a serializer for
 scoring models?

 looks very promising and i even found a hint on the connection augustus and
 spark

 all the best



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Performance of Akka or TCP Socket input sources vs HDFS: Data locality in Spark Streaming

2014-06-10 Thread Nilesh Chakraborty
Hello!

Spark Streaming supports HDFS as input source, and also Akka actor
receivers, or TCP socket receivers.

For my use case I think it's probably more convenient to read the data
directly from Actors, because I already need to set up a multi-node Akka
cluster (on the same nodes that Spark runs on) and write some actors to
perform some parallel operations. Writing actor receivers to consume the
results of my business-logic actors and then feed into Spark is pretty
seamless. Note that the actors generate a large amount of data (a few GBs to
tens of GBs).

The other option would be to setup HDFS on the same cluster as Spark, write
the data from the Actors to HDFS, and then use HDFS as input source for
Spark Streaming. Does this result in better performance due to data locality
(with HDFS data replication turned on)? I think performance should be almost
the same with actors, since Spark workers local to the worker actors should
get the data fast, and some optimization like this is definitely done I
assume?

I suppose the only benefit with HDFS would be better fault tolerance, and
the ability to checkpoint and recover even if master fails.

Cheers,
Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-of-Akka-or-TCP-Socket-input-sources-vs-HDFS-Data-locality-in-Spark-Streaming-tp7317.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Calling JavaPairRDD.first after calling JavaPairRDD.groupByKey results in NullPointerException

2014-06-10 Thread Gaurav Jain
I am getting a strange null pointer exception when trying to list the first
entry of a JavaPairRDD after calling groupByKey on it. Following is my code:


JavaPairRDDTuple3lt;String, String, String, ListString KeyToAppList =
KeyToApp.distinct().groupByKey();
// System.out.println(First member of the key-val list:  +
KeyToAppList.first());
// Above call to .first causes a null pointer exception
JavaRDDInteger KeyToAppCount = KeyToAppList.map(
new FunctionTuple2lt;Tuple3lt;String, 
String, String, ListString,
Integer() {
@Override
public Integer 
call(Tuple2Tuple3lt;String, String, String,
ListString tupleOfTupAndList) throws Exception {
ListString apps = 
tupleOfTupAndList._2;
SetString uniqueApps = new 
HashSetString(apps);
return uniqueApps.size();
}
});
System.out.println(First member of the key-val list:  +
KeyToAppCount.first());
// Above call to .first prints the first element all right. 


The first call to JavaPairRDD results in a null pointer exception. However,
if I comment out the call to JavaPairRDD.first(), and instead proceed onto
applying the map function, the call to JavaPairRDD.first() doesn't raise any
exception. Why the null pointer exception immediately after applying
groupByKey?

The null pointer exception looks like follows:
Exception in thread main org.apache.spark.SparkException: Job aborted:
Exception while deserializing and fetching task:
java.lang.NullPointerException
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-JavaPairRDD-first-after-calling-JavaPairRDD-groupByKey-results-in-NullPointerException-tp7318.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Performance of Akka or TCP Socket input sources vs HDFS: Data locality in Spark Streaming

2014-06-10 Thread Michael Cutler
Hey Nilesh,

Great to hear your using Spark Streaming, in my opinion the crux of your
question comes down to what you want to do with the data in the future
and/or if there is utility it using it from more than one Spark/Streaming
job.

1). *One-time-use fire and forget *- as you rightly point out, hooking up
to the Akka actors makes sense if the usefulness of the data is short-lived
and you don't need the ability to readily go back into archived data.

2). *Fault tolerance  multiple uses* - consider using a message queue like
Apache Kafka [1], write messages from your Akka Actors into a Kafka topic
with multiple partitions and replication.  Then use Spark Streaming job(s)
to read from Kafka.  You can tune Kafka to keep the last *N* days data
online so if your Spark Streaming job dies it can pickup at the point it
left off.

3). *Keep indefinitely* - files in HDFS, 'nuff said.

We're currently using (2) Kafka  (3) HDFS to process around 400M web
clickstream events a week.  Everything is written into Kafka and kept
'online' for 7 days, and also written out to HDFS in compressed
date-sequential files.

We use several Spark Streaming jobs to process the real-time events
straight from Kafka.  Kafka supports multiple consumers so each job sees
his own view of the message queue and all its events.  If any of the
Streaming jobs die or are restarted they continue consuming from Kafka from
the last processed message without effecting any of the other consumer
processes.

Best,

MC


[1] http://kafka.apache.org/



On 10 June 2014 13:05, Nilesh Chakraborty nil...@nileshc.com wrote:

 Hello!

 Spark Streaming supports HDFS as input source, and also Akka actor
 receivers, or TCP socket receivers.

 For my use case I think it's probably more convenient to read the data
 directly from Actors, because I already need to set up a multi-node Akka
 cluster (on the same nodes that Spark runs on) and write some actors to
 perform some parallel operations. Writing actor receivers to consume the
 results of my business-logic actors and then feed into Spark is pretty
 seamless. Note that the actors generate a large amount of data (a few GBs
 to
 tens of GBs).

 The other option would be to setup HDFS on the same cluster as Spark, write
 the data from the Actors to HDFS, and then use HDFS as input source for
 Spark Streaming. Does this result in better performance due to data
 locality
 (with HDFS data replication turned on)? I think performance should be
 almost
 the same with actors, since Spark workers local to the worker actors should
 get the data fast, and some optimization like this is definitely done I
 assume?

 I suppose the only benefit with HDFS would be better fault tolerance, and
 the ability to checkpoint and recover even if master fails.

 Cheers,
 Nilesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Performance-of-Akka-or-TCP-Socket-input-sources-vs-HDFS-Data-locality-in-Spark-Streaming-tp7317.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Problem in Spark Streaming

2014-06-10 Thread Boduo Li
Hi Nilmish,

What's the data rate/node when you see the high latency? (It seems the
latency keeps increasing.) Do you still see it if you lower the data rate or
the frequency of the windowed query?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7321.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-10 Thread nilmish
How can I measure data rate/node ?

I am feeding the data through kafka API. I only know the total inflow data
rate which almost remains constant . How can I figure out what amount of
data is distributed to the nodes in my cluster ? 

Latency does not keep on increasing infinetly. It goes up for some instant
and then it drops down again to the normal level. I want to get away with
these spikes in between. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: abnormal latency when running Spark Streaming

2014-06-10 Thread Boduo Li
Hi Yingjun,

Do you see a stable latency or the latency keeps increasing? And could you
provide some details about the input data rate/node, batch interval,
windowDuration and slideDuration when you see the high latency?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/abnormal-latency-when-running-Spark-Streaming-tp7315p7324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming socketTextStream

2014-06-10 Thread fredwolfinger
Good morning,

I have taken the socketTextStream example and instead of running on a local
Spark instance, I have pushed it to my Spark cluster in AWS (1 master with 5
slave nodes). I am getting the following error that appears to indicate that
all the slaves are trying to read from localhost: when all I really want
is the single master node to read from it's localhost: and batch up what
it receives. Can anyone help me with what I might be missing with the way I
am submitting the job?

14/06/10 13:12:49 INFO scheduler.ReceiverTracker: Registered receiver for
stream 0 from akka.tcp://spark@SLAVE-INTERNAL-IP:39710
14/06/10 13:12:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for
stream 0: Restarting receiver with delay 2000ms: Error connecting to
localhost: - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.init(Socket.java:425)
at java.net.Socket.init(Socket.java:208)
at
org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)
at
org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTextStream-tp7326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-10 Thread Boduo Li
Oh, I mean the average data rate/node.

But in case I want to know the input activities to each node (I use a custom
receiver instead of Kafka), I usually search these records in logs to get a
sense: BlockManagerInfo: Added input ... on [hostname:port] (size: xxx KB)

I also see some spikes in latency as I posted earlier:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262.html
It's even worse as the spikes cause the latency to increase infinitely when
the data rate is a little high, although the machines are underutilized. I
can't explain it either. I'm not sure if the cause is the same as yours.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7327.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming socketTextStream

2014-06-10 Thread Akhil Das
You can use the master's IP address (Or whichever machine you chose to run
the nc command) instead of localhost.


Re: Spark Streaming socketTextStream

2014-06-10 Thread fredwolfinger
Worked! Thanks so much!

Fred

Fred Wolfinger
Research Staff Member, CyberPoint Labs
direct +1 410 779 6741
mobile +1 443 655 3322

CyberPoint International

621 East Pratt Street, Suite 300

Baltimore MD 21202-3140

phone +1 410 779 6700

www.cyberpointllc.com http://www.cyberpointllc.com/





If you believe you received this e-mail in error, please notify the sender
immediately, delete the e-mail from your computer and do not copy or
disclose it to anyone else.



The information in this email constitutes the proprietary information of
Cyber Point International, LLC (DBA CyberPoint), and should be accessed only
by the individual to whom it is addressed. The information in this email and
any attachments may not be used, copied or disclosed without the consent of
CyberPoint. CyberPoint is not responsible for any damages caused by your
unauthorized use of the materials in this email.


From:  Akhil Das-2 [via Apache Spark User List]
ml-node+s1001560n7328...@n3.nabble.com
Date:  Tuesday, June 10, 2014 10:16 AM
To:  Fred Wolfinger fwolfin...@cyberpointllc.com
Subject:  Re: Spark Streaming socketTextStream

You can use the master's IP address (Or whichever machine you chose to run
the nc command) instead of localhost.



If you reply to this email, your message will be added to the discussion
below:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTe
xtStream-tp7326p7328.html
To unsubscribe from Spark Streaming socketTextStream, click here
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt
p?macro=unsubscribe_by_codenode=7326code=ZndvbGZpbmdlckBjeWJlcnBvaW50bGxjL
mNvbXw3MzI2fC0xMDE2NjE2NjAy .
NAML 
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt
p?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.
namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.vi
ew.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemai
l.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aem
ail.naml 




smime.p7s (6K) 
http://apache-spark-user-list.1001560.n3.nabble.com/attachment/7330/0/smime.p7s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTextStream-tp7326p7330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: pmml with augustus

2014-06-10 Thread Evan R. Sparks
I should point out that if you don't want to take a polyglot approach to
languages and reside solely in the JVM, then you can just use plain old
java serialization on the Model objects that come out of MLlib's APIs from
Java or Scala and load them up in another process and call the relevant
.predict() method when it comes time to serve. The same approach would
probably also work for models trained via MLlib's python APIs, but I
haven't tried that.

Native PMML serialization would be a nice feature to add to MLlib as a
mechanism to transfer models to other environments for further
analysis/serving. There's a JIRA discussion about this here:
https://issues.apache.org/jira/browse/SPARK-1406


On Tue, Jun 10, 2014 at 10:53 AM, filipus floe...@gmail.com wrote:

 Thank you very much

 the cascading project i didn't recognize it at all till now

 this project is very interesting

 also I got the idea of the usage of scala as a language for spark - becuase
 i can intergrate jvm based libraries very easy/naturaly when I got it right

 mh... but I could also use sparc as a model engine, augustus for the
 serializer and a third party produkt for the prediction engine like using
 jpmml

 mh... got the feeling that i need to do java, scala and python at the same
 time...

 first things first - augustus for an pmml output from spark :-)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7335.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1

2014-06-10 Thread bijoy deb
Hi all,

I have build Shark-0.9.1 using sbt using the below command:

*SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly*

My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0.

But when I try to execute the below command from Spark shell,which reads a
file from HDFS, I get the IPC version mismatch- IPC version 7 on server
versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient
class.











*scala val s = sc.textFile(hdfs://host:port/test.txt)scala
s.count()14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library
not loadedorg.apache.hadoop.ipc.RemoteException: Server IPC version 7
cannot communicate with client version 4at
org.apache.hadoop.ipc.Client.call(Client.java:1070)at
org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)at
com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source)at
org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)at
org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)*
at
org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)


Apparently this error is because of version mismatch of the hadoop-hdfs jar
between client (one referred by Spark) and server(hadoop cluster).But what
I don't understand is why is this mismatch (since I had built Spark with
the correct Hadoop version).

Any suggestions would be highly appreciated.

Thanks
Bijoy


Re: Can't find pyspark when using PySpark on YARN

2014-06-10 Thread Andrew Or
Hi Qi Ping,

You don't have to distribute these files; they are automatically packaged
in the assembly jar, which is already shipped to the worker nodes.

Other people have run into the same issue. See if the instructions here are
of any help:
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3ccamjob8mr1+ias-sldz_rfrke_na2uubnmhrac4nukqyqnun...@mail.gmail.com%3e

As described in the link, the last resort is to try building your assembly
jar with JAVA_HOME set to Java 6. This usually fixes the problem (more
details in the link provided).

Cheers,
Andrew


2014-06-10 6:35 GMT-07:00 李奇平 qiping@alibaba-inc.com:

 Dear all,

 When I submit a pyspark application using this command:

 ./bin/spark-submit --master yarn-client
 examples/src/main/python/wordcount.py hdfs://...

 I get the following exception:

 Error from python worker:
 Traceback (most recent call last):
 File /usr/ali/lib/python2.5/runpy.py, line 85, in run_module
 loader = get_loader(mod_name)
 File /usr/ali/lib/python2.5/pkgutil.py, line 456, in get_loader
 return find_loader(fullname)
 File /usr/ali/lib/python2.5/pkgutil.py, line 466, in find_loader
 for importer in iter_importers(fullname):
 File /usr/ali/lib/python2.5/pkgutil.py, line 422, in iter_importers
 __import__(pkg)
 ImportError: No module named pyspark
 PYTHONPATH was:

 /home/xxx/spark/python:/home/xxx/spark_on_yarn/python/lib/py4j-0.8.1-src.zip:/disk11/mapred/tmp/usercache//filecache/11/spark-assembly-1.0.0-hadoop2.0.0-ydh2.0.0.jar

 Maybe `pyspark/python` and `py4j-0.8.1-src.zip` is not included in the
 YARN worker,
 How can I distribute these files with my application? Can I use `--pyfiles
 python.zip, py4j-0.8.1-src.zip `?
 Or how can I package modules in pyspark to a .egg file?






Re: Spark Logging

2014-06-10 Thread Surendranauth Hiraman
Event logs are different from writing using a logger, like log4j. The event
logs are the type of data showing up in the history server.

For my team, we use com.typesafe.scalalogging.slf4j.Logging. Our logs show
up in /etc/spark/work/app-id/executor-id/stderr and stdout.

All of our logging seems to show up in stderr.

-Suren




On Tue, Jun 10, 2014 at 2:56 PM, coderxiang shuoxiang...@gmail.com wrote:

 By default, the logs are available at `/tmp/spark-events`. You can specify
 the log directory via spark.eventLog.dir, see  this configuration page
 http://spark.apache.org/docs/latest/configuration.html  .



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Logging-tp7340p7343.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


getting started with mllib.recommendation.ALS

2014-06-10 Thread Sandeep Parikh
Question on the input and output for ALS.train() and
MatrixFactorizationModel.predict().

My input is list of Ratings(user_id, product_id, rating) and my ratings are
one a scale of 1-5 (inclusive). When I compute predictions over the
superset of all (user_id, product_id) pairs, the ratings produced are on a
different scale.

The question is this: do I need to normalize the data coming out of
predict() to my own scale or does the input need to be different?

Thanks!


Re: NoSuchMethodError in KafkaReciever

2014-06-10 Thread mpieck
Hi,

I have the same problem when running Kafka to Spark Streaming pipeline from
Java with explicitely specified message decoders. I had thought, that it was
related to Eclipse environment, as suggested here, but it's not the case. I
have coded an example based on class:

https://github.com/apache/spark/blob/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java

and have builded shaded uber jar with all the deps and tried to run it from
command line. When I use the createStream method from the example class like
this:

KafkaUtils.createStream(jssc, zookeeper:port, test, topicMap);

everything is working fine, but when I explicitely specify message decoder
classes used in this method with another overloaded createStream method:

KafkaUtils.createStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, props, topicMap,
StorageLevels.MEMORY_AND_DISK_2);

the applications stops with an error:

14/06/10 22:28:06 ERROR kafka.KafkaReceiver: Error receiving data
java.lang.NoSuchMethodException:
java.lang.Object.init(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Unknown Source)
at java.lang.Class.getConstructor(Unknown Source)
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108)
at
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126)

I have tried Spark versions 0.9.0-incubating, 0.9.0 and 0.9.1, but the error
occurs everywhere. Kafka StringDecoder class has the constructor with
VerifiableProperties parameter and all required classes are in the same uber
jar, so it is strange that scala/java cannot find it with reflection api.
Maybe there is some problem with Manifest/ClassTag usage in KafkaUtils or
KafkaInputDStream classes, but I'm not a Scala expert and cannot be sure
about it. The problematic code is the same from version 0.9 to the current
one, so it's still there. Unit test from the Spark project is working fine
with every KafkaUtils method, because the test does not try to register the
kafka stream, only checks the interface.

Currently it is possible to use Kafka to Spark Streaming pipeline from Java
only with the default String message decoders, which makes this tool almost
useless (unless you are a great JSON fan).




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209p7347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NoSuchMethodError in KafkaReciever

2014-06-10 Thread Michael Chang
I had this same problem as well.  I ended up just adding the necessary code
in KafkaUtil and compiling my own spark jar.  Something like this for the
raw stream:

  def createRawStream(
  jssc: JavaStreamingContext,
  kafkaParams: JMap[String, String],
  topics: JMap[String, JInt]
   ): JavaPairDStream[Array[Byte], Array[Byte]] = {
new KafkaInputDStream[Array[Byte], Array[Byte], DefaultDecoder,
DefaultDecoder](
  jssc.ssc, kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
StorageLevel.MEMORY_AND_DISK_SER_2)
  }


On Tue, Jun 10, 2014 at 2:15 PM, mpieck mpi...@gazeta.pl wrote:

 Hi,

 I have the same problem when running Kafka to Spark Streaming pipeline from
 Java with explicitely specified message decoders. I had thought, that it
 was
 related to Eclipse environment, as suggested here, but it's not the case. I
 have coded an example based on class:


 https://github.com/apache/spark/blob/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java

 and have builded shaded uber jar with all the deps and tried to run it from
 command line. When I use the createStream method from the example class
 like
 this:

 KafkaUtils.createStream(jssc, zookeeper:port, test, topicMap);

 everything is working fine, but when I explicitely specify message decoder
 classes used in this method with another overloaded createStream method:

 KafkaUtils.createStream(jssc, String.class, String.class,
 StringDecoder.class, StringDecoder.class, props, topicMap,
 StorageLevels.MEMORY_AND_DISK_2);

 the applications stops with an error:

 14/06/10 22:28:06 ERROR kafka.KafkaReceiver: Error receiving data
 java.lang.NoSuchMethodException:
 java.lang.Object.init(kafka.utils.VerifiableProperties)
 at java.lang.Class.getConstructor0(Unknown Source)
 at java.lang.Class.getConstructor(Unknown Source)
 at

 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108)
 at

 org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126)

 I have tried Spark versions 0.9.0-incubating, 0.9.0 and 0.9.1, but the
 error
 occurs everywhere. Kafka StringDecoder class has the constructor with
 VerifiableProperties parameter and all required classes are in the same
 uber
 jar, so it is strange that scala/java cannot find it with reflection api.
 Maybe there is some problem with Manifest/ClassTag usage in KafkaUtils or
 KafkaInputDStream classes, but I'm not a Scala expert and cannot be sure
 about it. The problematic code is the same from version 0.9 to the current
 one, so it's still there. Unit test from the Spark project is working fine
 with every KafkaUtils method, because the test does not try to register the
 kafka stream, only checks the interface.

 Currently it is possible to use Kafka to Spark Streaming pipeline from Java
 only with the default String message decoders, which makes this tool almost
 useless (unless you are a great JSON fan).




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209p7347.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: getting started with mllib.recommendation.ALS

2014-06-10 Thread Sean Owen
For trainImplicit(), the output is an approximation of a matrix of 0s
and 1s, so the values are generally (not always) in [0,1]

But for train(), you should be predicting the original input matrix
as-is, as I understand. You should get output in about the same range
as the input but again not necessarily 1-5. If it's really different,
you could be underfitting. Try less lambda, more features?

On Tue, Jun 10, 2014 at 4:59 PM, Sandeep Parikh sand...@clusterbeep.org wrote:
 Question on the input and output for ALS.train() and
 MatrixFactorizationModel.predict().

 My input is list of Ratings(user_id, product_id, rating) and my ratings are
 one a scale of 1-5 (inclusive). When I compute predictions over the superset
 of all (user_id, product_id) pairs, the ratings produced are on a different
 scale.

 The question is this: do I need to normalize the data coming out of
 predict() to my own scale or does the input need to be different?

 Thanks!



Re: NoSuchMethodError in KafkaReciever

2014-06-10 Thread Sean Owen
I added https://issues.apache.org/jira/browse/SPARK-2103 to track
this. I also ran into it. I don't have a fix, but, somehow I think
someone with more understanding of Scala and Manifest objects might
see the easy fix.

On Tue, Jun 10, 2014 at 5:15 PM, mpieck mpi...@gazeta.pl wrote:
 Hi,

 I have the same problem when running Kafka to Spark Streaming pipeline from
 Java with explicitely specified message decoders. I had thought, that it was
 related to Eclipse environment, as suggested here, but it's not the case. I
 have coded an example based on class:

 https://github.com/apache/spark/blob/branch-0.9/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java

 and have builded shaded uber jar with all the deps and tried to run it from
 command line. When I use the createStream method from the example class like
 this:

 KafkaUtils.createStream(jssc, zookeeper:port, test, topicMap);

 everything is working fine, but when I explicitely specify message decoder
 classes used in this method with another overloaded createStream method:

 KafkaUtils.createStream(jssc, String.class, String.class,
 StringDecoder.class, StringDecoder.class, props, topicMap,
 StorageLevels.MEMORY_AND_DISK_2);

 the applications stops with an error:

 14/06/10 22:28:06 ERROR kafka.KafkaReceiver: Error receiving data
 java.lang.NoSuchMethodException:
 java.lang.Object.init(kafka.utils.VerifiableProperties)
 at java.lang.Class.getConstructor0(Unknown Source)
 at java.lang.Class.getConstructor(Unknown Source)
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108)
 at
 org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126)

 I have tried Spark versions 0.9.0-incubating, 0.9.0 and 0.9.1, but the error
 occurs everywhere. Kafka StringDecoder class has the constructor with
 VerifiableProperties parameter and all required classes are in the same uber
 jar, so it is strange that scala/java cannot find it with reflection api.
 Maybe there is some problem with Manifest/ClassTag usage in KafkaUtils or
 KafkaInputDStream classes, but I'm not a Scala expert and cannot be sure
 about it. The problematic code is the same from version 0.9 to the current
 one, so it's still there. Unit test from the Spark project is working fine
 with every KafkaUtils method, because the test does not try to register the
 kafka stream, only checks the interface.

 Currently it is possible to use Kafka to Spark Streaming pipeline from Java
 only with the default String message decoders, which makes this tool almost
 useless (unless you are a great JSON fan).




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209p7347.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Information on Spark UI

2014-06-10 Thread coderxiang
The executors shown CANNOT FIND ADDRESS are not listed in the Executors Tab
on the top of the Spark UI.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Information-on-Spark-UI-tp7354p7355.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark streaming, kafka, SPARK_CLASSPATH

2014-06-10 Thread lannyripple
I am using Spark 1.0.0 compiled with Hadoop 1.2.1.

I have a toy spark-streaming-kafka program.  It reads from a kafka queue and
does

stream
  .map {case (k, v) = (v, 1)}
  .reduceByKey(_ + _)
  .print()

using a 1 second interval on the stream.

The docs say to make Spark and Hadoop jars 'provided' but this breaks for
spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
'compile' to sweep them into our assembly gives collisions on javax.*
classes.  To work around this I modified
$SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
spark-streaming-kafka, and zkclient.  (Note that kafka is included as
'compile' in my project and picked up in the assembly.)

I have set up conf/spark-env.sh as needed.  I have copied my assembly to
/tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I am
running spark-submit from my spark master.  I am guided by the information
here https://spark.apache.org/docs/latest/submitting-applications.html

Well at this point I was going to detail all the ways spark-submit fails to
follow it's own documentation.  If I do not invoke sparkContext.setJars()
then it just fails to find the driver class.  This is using various
combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
and local: prefixes on the application-jar and --jars arguments.

If I invoke sparkContext.setJars() and include my assembly jar I get
further.  At this point I get a failure from
kafka.consumer.ConsumerConnector not being found.  I suspect this is because
spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
too late in the classpath.

At this point I try setting spark.files.userClassPathfirst to 'true' but
this causes more things to blow up.

I finally found something that works.  Namely setting environment variable
SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
helpfully informed to

  Please instead use:
   - ./spark-submit with --driver-class-path to augment the driver classpath
   - spark.executor.extraClassPath to augment the executor classpath

which when put into a file and introduced with --properties-file does not
work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
the kafka.consumer.ConsumerConnector error.

At a guess what's going on is that using SPARK_CLASSPATH I have my assembly
jar in the classpath at SparkSubmit invocation 

  Spark Command: java -cp
/tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
-XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
/tmp/myjar.jar

but using --properties-file then the assembly is not available for
SparkSubmit.

I think the root cause is either spark-submit not handling the
spark-streaming libraries so they can be 'provided' or the inclusion of
org.elicpse.jetty.orbit in the streaming libraries which cause

  [error] (*:assembly) deduplicate: different file contents found in the
following:
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA

I've tried applying mergeStategy in assembly for my assembly.sbt but then I
get 

  Invalid signature file digest for Manifest main attributes

If anyone knows the magic to get this working a reply would be greatly
appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


groupBy question

2014-06-10 Thread SK
After doing a groupBy operation, I have the following result:

 val res = 
(ID1,ArrayBuffer((145804601,ID1,japan)))
(ID3,ArrayBuffer((145865080,ID3,canada),
(145899640,ID3,china)))
(ID2,ArrayBuffer((145752760,ID2,usa),
(145934200,ID2,usa)))

Now I need to output for each group, the size of each group and the max of
the first field, which is a timestamp.
So, I tried the following:

1) res.map(group = (group._2.size, group._2._1.max))
But I got an error : value _1 is not a member of Iterable[(Long, String,
String)]

2) I also tried: res.map(group = (group._2.size, group._2[1].max)), but got
an error for that as well.

What is the right way to get the max of the timestamp field (the first field
in the ArrayBuffer) for each group?


thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-question-tp7357.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Monitoring spark dis-associated workers

2014-06-10 Thread Allen Chang
We're running into an issue where periodically the master loses connectivity
with workers in the spark cluster. We believe this issue tends to manifest
when the cluster is under heavy load, but we're not entirely sure when it
happens. I've seen one or two other messages to this list about this issue,
but no one seems to have a clue as to the actual bug.

So, to work around the issue, we'd like to programmatically monitor the
number of workers connected to the master and restart the cluster when the
master loses track of some of its workers. Any ideas on how to
programmatically write such a health check?

Thanks,
Allen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Monitoring-spark-dis-associated-workers-tp7358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


problem starting the history server on EC2

2014-06-10 Thread zhen
I created a Spark 1.0 cluster on EC2 using the provided scripts. However, I
do not seem to be able to start the history server on the master node. I
used the following command:

./start-history-server.sh /root/spark_log


The error message says that the logging directory /root/spark_log does not
exist. But I have definitely created the directory and made sure everyone
can read/write/execute in the directory.

Can you tell me why it  does not work?

Thank you

Zhen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem starting the history server on EC2

2014-06-10 Thread bc Wong
What's the permission on /root itself?
On Jun 10, 2014 6:29 PM, zhen z...@latrobe.edu.au wrote:

 I created a Spark 1.0 cluster on EC2 using the provided scripts. However, I
 do not seem to be able to start the history server on the master node. I
 used the following command:

 ./start-history-server.sh /root/spark_log


 The error message says that the logging directory /root/spark_log does not
 exist. But I have definitely created the directory and made sure everyone
 can read/write/execute in the directory.

 Can you tell me why it  does not work?

 Thank you

 Zhen



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



output tuples in CSV format

2014-06-10 Thread SK
My output is a set of tuples and when I output it using saveAsTextFile, my
file looks as follows:

(field1_tup1, field2_tup1, field3_tup1,...)
(field1_tup2, field2_tup2, field3_tup2,...)

In Spark. is there some way I can simply have it output in CSV format as
follows (i.e. without the parentheses):
field1_tup1, field2_tup1, field3_tup1,...
field1_tup2, field2_tup2, field3_tup2,...

I could write a script to remove the parentheses, but would be easier if I
could omit the parentheses. I did not find a saveAsCsvFile in Spark.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/output-tuples-in-CSV-format-tp7363.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using Spark on Data size larger than Memory size

2014-06-10 Thread Allen Chang
Thanks for the clarification.

What is the proper way to configure RDDs when your aggregate data size
exceeds your available working memory size? In particular, in additional to
typical operations, I'm performing cogroups, joins, and coalesces/shuffles.

I see that the default storage level for RDDs is MEMORY_ONLY. Do I just need
to set all the storage level for all of my RDDs to something like
MEMORY_AND_DISK? Do I need to do anything else to get graceful behavior in
the presence of coalesces/shuffles, cogroups, and joins?

Thanks,
Allen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-on-Data-size-larger-than-Memory-size-tp6589p7364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Information on Spark UI

2014-06-10 Thread Neville Li
We are seeing this issue as well.
We run on YARN and see logs about lost executor. Looks like some stages had
to be re-run to compute RDD partitions lost in the executor.

We were able to complete 20 iterations with 20% full matrix but not beyond
that (total  100GB).


On Tue, Jun 10, 2014 at 8:32 PM, coderxiang shuoxiang...@gmail.com wrote:

 The executors shown CANNOT FIND ADDRESS are not listed in the Executors
 Tab
 on the top of the Spark UI.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Information-on-Spark-UI-tp7354p7355.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: output tuples in CSV format

2014-06-10 Thread Mikhail Strebkov
you can just use something like this:
  myRdd(_.productIterator.mkString(,)).saveAsTextFile


On Tue, Jun 10, 2014 at 6:34 PM, SK skrishna...@gmail.com wrote:

 My output is a set of tuples and when I output it using saveAsTextFile, my
 file looks as follows:

 (field1_tup1, field2_tup1, field3_tup1,...)
 (field1_tup2, field2_tup2, field3_tup2,...)

 In Spark. is there some way I can simply have it output in CSV format as
 follows (i.e. without the parentheses):
 field1_tup1, field2_tup1, field3_tup1,...
 field1_tup2, field2_tup2, field3_tup2,...

 I could write a script to remove the parentheses, but would be easier if I
 could omit the parentheses. I did not find a saveAsCsvFile in Spark.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/output-tuples-in-CSV-format-tp7363.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



RE: output tuples in CSV format

2014-06-10 Thread Shao, Saisai
It would be better to add one more transformation step before saveAsTextFile, 
like:

rdd.map(tuple = %s,%s,%s.format(tuple._1, tuple._2, 
tuple._3)).saveAsTextFile(...)

By manually convert to the format you what, and then write to HDFS.

Thanks
Jerry

-Original Message-
From: SK [mailto:skrishna...@gmail.com] 
Sent: Wednesday, June 11, 2014 9:34 AM
To: u...@spark.incubator.apache.org
Subject: output tuples in CSV format

My output is a set of tuples and when I output it using saveAsTextFile, my file 
looks as follows:

(field1_tup1, field2_tup1, field3_tup1,...) (field1_tup2, field2_tup2, 
field3_tup2,...)

In Spark. is there some way I can simply have it output in CSV format as 
follows (i.e. without the parentheses):
field1_tup1, field2_tup1, field3_tup1,...
field1_tup2, field2_tup2, field3_tup2,...

I could write a script to remove the parentheses, but would be easier if I 
could omit the parentheses. I did not find a saveAsCsvFile in Spark.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/output-tuples-in-CSV-format-tp7363.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to process multiple classification with SVM in MLlib

2014-06-10 Thread littlebird
Thanks. Now I know how to broadcast the dataset but I still wonder after 
broadcasting the dataset how can I apply my algorithm to training the model
in the wokers. To describe my question in detail, The following code is used
to train LDA(Latent Dirichlet Allocation) model with JGibbLDA in single
machine, it iterate to sample the topic and train the model. After 
broadcasting the dataset, how can I keep the code  running in Spark? Thank
you. 
LDACmdOption ldaOption = new LDACmdOption(); //to set the
parameters of LDA 
ldaOption.est = true; 
ldaOption.estc = false; 
ldaOption.modelName = model-final;//the name of the output
file 
ldaOption.dir = /usr/Java; 
ldaOption.dfile = newDoc.dat//this is the input data file 
ldaOption.alpha = 0.5; 
ldaOption.beta = 0.1; 
ldaOption.K = 10;// the numbers of the topic 
ldaOption.niters = 1000;//the times of iteration 
topicNum = ldaOption.K; 
Estimator estimator = new Estimator(); 
estimator.init(ldaOption); 
estimator.estimate(); 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem starting the history server on EC2

2014-06-10 Thread zhen
I checked the permission on root and it is the following:

drwxr-xr-x 20 root root  4096 Jun 11 01:05 root

So anyway, I changed to use /tmp/spark_log instead and this time I made sure
that all permissions are given to /tmp and /tmp/spark_log like below. But it
still does not work:

drwxrwxrwt  8 root root  4096 Jun 11 02:08 tmp
drwxrwxrwx 2 root root   4096 Jun 11 02:08 spark_log

Thanks

Zhen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7370.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem starting the history server on EC2

2014-06-10 Thread Andrew Or
Can you try file:/root/spark_log?


2014-06-10 19:22 GMT-07:00 zhen z...@latrobe.edu.au:

 I checked the permission on root and it is the following:

 drwxr-xr-x 20 root root  4096 Jun 11 01:05 root

 So anyway, I changed to use /tmp/spark_log instead and this time I made
 sure
 that all permissions are given to /tmp and /tmp/spark_log like below. But
 it
 still does not work:

 drwxrwxrwt  8 root root  4096 Jun 11 02:08 tmp
 drwxrwxrwx 2 root root   4096 Jun 11 02:08 spark_log

 Thanks

 Zhen



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7370.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to process multiple classification with SVM in MLlib

2014-06-10 Thread littlebird
Someone suggests me to use Mahout, but I'm not familiar with it. And in that
case, using Mahout will add difficulties to my program. I'd like to run the
algorithm in Spark. I'm a beginner, can you give me some suggestions?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7372.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Question about RDD cache, unpersist, materialization

2014-06-10 Thread innowireless TaeYun Kim
Hi,

What I (seems to) know about RDD persisting API is as follows:
- cache() and persist() is not an action. It only does a marking.
- unpersist() is also not an action. It only removes a marking. But if the
rdd is already in memory, it is unloaded.

And there seems no API to forcefully materialize the RDD without requiring a
data by an action method, for example first().

So, I am faced with the following scenario.

{
JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
empty for merging
for (int i = 0; i  10; i++)
{
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
rdd.cache();  // Since it will be used twice, cache.
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
Transform and save, rdd materializes
rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
transform to T and merge by union
rdd.unpersist();  // Now it seems not needed. (But needed actually)
}
// Here, rddUnion actually materializes, and needs all 10 rdds that
already unpersisted.
// So, rebuilding all 10 rdds will occur.
rddUnion.saveAsTextFile(mergedFileName);
}

If rddUnion can be materialized before the rdd.unpersist() line and
cache()d, the rdds in the loop will not be needed on
rddUnion.saveAsTextFile().

Now what is the best strategy?
- Do not unpersist all 10 rdds in the loop.
- Materialize rddUnion in the loop by calling 'light' action API, like
first().
- Give up and just rebuild/reload all 10 rdds when saving rddUnion.

Is there some misunderstanding?

Thanks.




Re: getting started with mllib.recommendation.ALS

2014-06-10 Thread Sandeep Parikh
Thanks Sean. I realized that I was supplying train() with a very low rank
so I will retry with something higher and then play with lambda as-needed.


On Tue, Jun 10, 2014 at 4:58 PM, Sean Owen so...@cloudera.com wrote:

 For trainImplicit(), the output is an approximation of a matrix of 0s
 and 1s, so the values are generally (not always) in [0,1]

 But for train(), you should be predicting the original input matrix
 as-is, as I understand. You should get output in about the same range
 as the input but again not necessarily 1-5. If it's really different,
 you could be underfitting. Try less lambda, more features?

 On Tue, Jun 10, 2014 at 4:59 PM, Sandeep Parikh sand...@clusterbeep.org
 wrote:
  Question on the input and output for ALS.train() and
  MatrixFactorizationModel.predict().
 
  My input is list of Ratings(user_id, product_id, rating) and my ratings
 are
  one a scale of 1-5 (inclusive). When I compute predictions over the
 superset
  of all (user_id, product_id) pairs, the ratings produced are on a
 different
  scale.
 
  The question is this: do I need to normalize the data coming out of
  predict() to my own scale or does the input need to be different?
 
  Thanks!
 



RE: Question about RDD cache, unpersist, materialization

2014-06-10 Thread innowireless TaeYun Kim
BTW, it is possible that rdd.first() does not compute the whole partitions.
So, first() cannot be uses for the situation below.

-Original Message-
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM
To: user@spark.apache.org
Subject: Question about RDD cache, unpersist, materialization

Hi,

What I (seems to) know about RDD persisting API is as follows:
- cache() and persist() is not an action. It only does a marking.
- unpersist() is also not an action. It only removes a marking. But if the
rdd is already in memory, it is unloaded.

And there seems no API to forcefully materialize the RDD without requiring a
data by an action method, for example first().

So, I am faced with the following scenario.

{
JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
empty for merging
for (int i = 0; i  10; i++)
{
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
rdd.cache();  // Since it will be used twice, cache.
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
Transform and save, rdd materializes
rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
transform to T and merge by union
rdd.unpersist();  // Now it seems not needed. (But needed actually)
}
// Here, rddUnion actually materializes, and needs all 10 rdds that
already unpersisted.
// So, rebuilding all 10 rdds will occur.
rddUnion.saveAsTextFile(mergedFileName);
}

If rddUnion can be materialized before the rdd.unpersist() line and
cache()d, the rdds in the loop will not be needed on
rddUnion.saveAsTextFile().

Now what is the best strategy?
- Do not unpersist all 10 rdds in the loop.
- Materialize rddUnion in the loop by calling 'light' action API, like
first().
- Give up and just rebuild/reload all 10 rdds when saving rddUnion.

Is there some misunderstanding?

Thanks.




Re: Problem in Spark Streaming

2014-06-10 Thread Ashish Rangole
Have you considered the garbage collection impact and if it coincides with
your latency spikes? You can enable gc logging by changing Spark
configuration for your job.
Hi, as I searched the keyword Total delay in the console log, the delay
keeps increasing. I am not sure what does this total delay mean? For
example, if I perform a windowing wordcount with windowSize=1ms and
slidingStep=2000ms, then does the delay measured from the 10th second?

A sample log is shown as follows:
Total delay: 136.983 s for time 1402409331000 ms (execution: 1.711s) --what
is execution time?
Finished TID 490 in 14 ms on  (progress: 1/6) --what is TID? and what
is the progress?



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem starting the history server on EC2

2014-06-10 Thread Andrew Or
No, I meant pass the path to the history server start script.


2014-06-10 19:33 GMT-07:00 zhen z...@latrobe.edu.au:

 Sure here it is:

 drwxrwxrwx  2 1000 root 4096 Jun 11 01:05 spark_logs

 Zhen



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361p7373.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to specify executor memory in EC2 ?

2014-06-10 Thread Matei Zaharia
It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and 
is overriding the application’s settings. Take a look in there and delete that 
line if possible.

Matei

On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com 
wrote:

 I am testing my application in EC2 cluster of m3.medium machines. By default, 
 only 512 MB of memory on each machine is used. I want to increase this amount 
 and I'm trying to do it by passing --executor-memory 2G option to the 
 spark-submit script, but it doesn't seem to work - each machine uses only 512 
 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the 
 amount of memory? 



Re: problem starting the history server on EC2

2014-06-10 Thread Krishna Sankar
Yep, it gives tons of errors. I was able to make it work with sudo. Looks
like ownership issue.
Cheers
k/


On Tue, Jun 10, 2014 at 6:29 PM, zhen z...@latrobe.edu.au wrote:

 I created a Spark 1.0 cluster on EC2 using the provided scripts. However, I
 do not seem to be able to start the history server on the master node. I
 used the following command:

 ./start-history-server.sh /root/spark_log


 The error message says that the logging directory /root/spark_log does not
 exist. But I have definitely created the directory and made sure everyone
 can read/write/execute in the directory.

 Can you tell me why it  does not work?

 Thank you

 Zhen



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/problem-starting-the-history-server-on-EC2-tp7361.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.