Re: Kafka client - specify offsets?

2014-06-24 Thread Tobias Pfeiffer
Michael,

apparently, the parameter "auto.offset.reset" has a different meaning
in Spark's Kafka implementation than what is described in the
documentation.

The Kafka docs at 
specify the effect of "auto.offset.reset" as:
> What to do when there is no initial offset in ZooKeeper or if an offset is 
> out of range:
> * smallest : automatically reset the offset to the smallest offset
> * largest : automatically reset the offset to the largest offset
> * anything else: throw exception to the consumer

However, Spark's implementation seems to drop the part "when there is
no initial offset", as can be seen in
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L102
-- it will just wipe the stored offset from Zookeeper. I guess it's
actually a bug, because the parameter's effect is different than what
is documented, but then it's good for you (and me) because it allows
to specify "I want all that I can get" or "I want to start reading
right now", even if there is an offset stored in Zookeeper.

Tobias

On Sun, Jun 15, 2014 at 11:27 PM, Tobias Pfeiffer  wrote:
> Hi,
>
> there are apparently helpers to tell you the offsets
> ,
> but I have no idea how to pass that to the Kafka stream consumer. I am
> interested in that as well.
>
> Tobias
>
> On Thu, Jun 12, 2014 at 5:53 AM, Michael Campbell
>  wrote:
>> Is there a way in the Apache Spark Kafka Utils to specify an offset to start
>> reading?  Specifically, from the start of the queue, or failing that, a
>> specific point?


Changing log level of spark

2014-06-24 Thread Philip Limbeck
Hi!

According to
https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging,
changing log-level is just a matter of creating a log4j.properties (which
is in the classpath of spark) and changing log level there for the root
logger. I did this steps on every node in the cluster (master and worker
nodes). However, after restart there is still no debug output as desired,
but only the default info log level.


Re: DAGScheduler: Failed to run foreach

2014-06-24 Thread Aaron Davidson
That IntRef problem is very strange, as it's not related to running a spark
job, but rather just interpreting the code in the repl. There are two
possibilities I can think of:
- Spark was compiled with a different version of Scala than you're running
it on. Spark is compiled on Scala 2.10 from Spark 0.9.0 onward, so make
sure that's what you're using.
- You have two different versions of Scala on the classpath of your repl.

Regarding the NotSerializableException, it's exactly what it says. I
suspect that "jc_" is either a com.wcohen.ss.Jaccard or references a
Jaccard, so this line requires that it be serialized:

destrdd.map(x => jc_.score(str1, new BasicStringWrapper(x)))

You have a few ways around this, in ascending order of complication:
1. Instantiate your Jaccard with Serializable, like this:

val mjc = new Jaccard() with Serializable

This may actually work because Jaccard actually has a zero-arg constructor.

2. Create a new Jaccard in each closure:

destrdd.map(x => new Jaccard().score(str1, new BasicStringWrapper(x)))

In general, if this were an expensive operation, you could amortize it by
using mapPartitions:

destrdd.mapPartitions { part =>
  val jc = new Jaccard()
  part.map(jc.score(str1, new BasicStringWrapper(x)))
}

3. Use Kryo instead of Java serialization, and register a custom Serializer
for Jaccard. This is a good general solution since Kryo is faster than
Java, but may cause further complications. See
http://spark.apache.org/docs/latest/tuning.html#data-serialization.


On Tue, Jun 24, 2014 at 4:36 PM, Sameer Tilak  wrote:

> Dear Aaron,
> Thanks for your help. I am still facing few problems.
>
> I am using a 3rd party library (jar file) under the hood when I call
> jc_->score. Each call to jc_->score will generate a array of doubles. It
> is basically score of the current sentence with every sentence in the
> destrdd generated by loading a file from hdfs. My goal is to then save it
> in a file and repeat it for every sentence/string from sourcerdd.
>
>
>   def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String])  {
>   val jc_ = this.mjc
>   var i : Int = 0
>
>   for (sentence <- sourcerdd.toLocalIterator)
>{
> val str1 = new StringWrapper (sentence)
> var scorevector = destrdd.map(x => jc_.score(str1, new
> BasicStringWrapper(x)))
> val fileName = new
> String("/apps/software/scala-approsstrmatch-sentence" + i)
> scorevector.saveAsTextFile(fileName)
>i += 1
>}
>
>   }
> I get this error message. I am not sure why var i : Int = 0 will throw an
> error?
>
> java.lang.NoSuchMethodError:
> scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;
>  at
> approxstrmatch.JaccardScore.calculateScoreSecond(JaccardScore.scala:77)
> at $iwC$$iwC$$iwC$$iwC.(:19)
>  at $iwC$$iwC$$iwC.(:24)
> at $iwC$$iwC.(:26)
> at $iwC.(:28)
>  at (:30)
> at .(:34)
> at .()
>  at .(:7)
> at .()
> at $print()
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
>  at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
>  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
>  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
>  at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
>  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
>  at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
>  at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
>  at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
>  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
>  at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:601)
> at org.apache.spark.deploy.SparkSubmit

Re: Does PUBLIC_DNS environment parameter really works?

2014-06-24 Thread Andrew Or
Hi Peng,

What you're looking for is SPARK_MASTER_IP, which defaults to the output of
the command "hostname" (see sbin/start-master.sh).

What SPARK_PUBLIC_DNS does is it changes what the Master or the Worker
advertise to others. If this is set, the links on the Master and Worker web
UI will use public addresses instead of private ones, for example. This is
useful if you're browsing through these UIs locally from your machine
outside of the cluster.

Best,
Andrew


2014-06-24 21:01 GMT-07:00 Peng Cheng :

> I'm deploying a cluster to Amazon EC2, trying to override its internal ip
> addresses with public dns
>
> I start a cluster with environment parameter: SPARK_PUBLIC_DNS=[my EC2
> public DNS]
>
> But it doesn't change anything on the web UI, it still shows internal ip
> address
>
> Spark Master at spark://ip-172-31-32-12:7077
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-PUBLIC-DNS-environment-parameter-really-works-tp8237.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Does PUBLIC_DNS environment parameter really works?

2014-06-24 Thread Peng Cheng
I'm deploying a cluster to Amazon EC2, trying to override its internal ip
addresses with public dns

I start a cluster with environment parameter: SPARK_PUBLIC_DNS=[my EC2
public DNS]

But it doesn't change anything on the web UI, it still shows internal ip
address

Spark Master at spark://ip-172-31-32-12:7077



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-PUBLIC-DNS-environment-parameter-really-works-tp8237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problems running Spark job on mesos in fine-grained mode

2014-06-24 Thread Sébastien Rainville
Hi Mayur,

I use primarily Scala, but I tested with pyspark, and it's working fine too
post the patch.

Thanks,

- Sebastien


On Tue, Jun 24, 2014 at 6:08 PM, Mayur Rustagi 
wrote:

> Hi Sebastien,
> Are you using Pyspark by any chance, is that working for you (post the
> patch?)
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Mon, Jun 23, 2014 at 1:51 PM, Fedechicco  wrote:
>
>> I'm getting the same behavior and it's crucial I get it fixed for an
>> evaluation of Spark + Mesos within my company.
>>
>> I'm bumping +1 for the request of putting this fix in the 1.0.1 if
>> possible!
>>
>> thanks,
>> Federico
>>
>>
>> 2014-06-20 20:51 GMT+02:00 Sébastien Rainville <
>> sebastienrainvi...@gmail.com>:
>>
>> Hi,
>>>
>>> this is just a follow-up regarding this issue. Turns out that it's
>>> caused by a bug in Spark. I created a case for it:
>>> https://issues.apache.org/jira/browse/SPARK-2204 and submitted a patch.
>>>
>>> Any chance this could be included in the 1.0.1 release?
>>>
>>> Thanks,
>>>
>>> - Sebastien
>>>
>>>
>>>
>>> On Tue, Jun 17, 2014 at 2:57 PM, Sébastien Rainville <
>>> sebastienrainvi...@gmail.com> wrote:
>>>
 Hi,

 I'm having trouble running spark on mesos in fine-grained mode. I'm
 running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which
 most of the time, but not always, cause the job to fail. The same code is
 running fine in coarse-grained mode. I see the following exceptions in the
 logs of the spark driver:

 W0617 10:57:36.774382  8735 sched.cpp:901] Attempting to launch task
 21 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
 W0617 10:57:36.774433  8735 sched.cpp:901] Attempting to launch task
 22 with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for
 201311011608-1369465866-5050-9189-46 from TaskSet 0.0
 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2)
 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0)
 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1)
 14/06/17 10:57:36 INFO DAGScheduler: Executor lost:
 201311011608-1369465866-5050-9189-46 (epoch 0)
 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove
 executor 201311011608-1369465866-5050-9189-46 from BlockManagerMaster.
 14/06/17 10:57:36 INFO BlockManagerMaster: Removed
 201311011608-1369465866-5050-9189-46 successfully in removeExecutor
 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1
 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list
 earlier: ca1-dcc1-0065.lab.mtl

 I don't see any exceptions in the spark executor logs. The only error
 message I found in mesos itself is warnings in the mesos master:

 W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21
 : Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1;
 mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
 ports(*):[11900-11919, 1192
 1-11995, 11997-11999]; cpus(*):1
 W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22
 : Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1;
 mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
 ports(*):[11900-11919, 1192
 1-11995, 11997-11999]; cpus(*):1
 W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28
 : Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1;
 mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304;
 ports(*):[11900-
 11960, 11962-11978, 11980-11999]
 W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-46 on slave
 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl)
 W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-34 on slave
 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl)
 W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-59 on slave
 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl)
 W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited
 executor 201311011608-1369465866-5050-9189-18 on slave
 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl)
 ... (more of those "Ignoring unknown exited executor")


 I analyzed the difference in between the execution of the same job in
 coarse-grained mode and fine-grained mode, and I noticed that in the
 fine-grained mode the tasks get executed on executors different than the
 ones reported in spark, as if spark and mesos get out of s

Re: How data is distributed while processing in spark cluster?

2014-06-24 Thread srujana
Thanks for the response.

I would also like to know, What happens if a slave node is removed while it
is processing some data. Does master send that data for
re-processing/resume-process to other slave nodes ? And does it happen with
the help of HDFS?

Thanks,
Srujana



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-data-is-distributed-while-processing-in-spark-cluster-tp8160p8235.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: partitions, coalesce() and parallelism

2014-06-24 Thread Nicholas Chammas
Ah, here's a better hypothesis. Everything you are doing minus the save() is
a transformation, not an action. Since nothing is actually triggered until
the save(), Spark may be seeing that the lineage of operations ends with 2
partitions anyway and simplifies accordingly.

Two suggestions you can try:

   1. Remove the coalesce(2) and concatenate the files post-processing to
   get the number of files you want. This will also ensure the save() operation
   can be parallelized fully. I think this is the preferable approach since it
   does not artificially reduce the parallelism of your job at any stage.
   2.

   Another thing you can try is the following:

   val rdd1 = sc.sequenceFile(...)
   val rdd2 = rdd1.coalesce(100)

   val rdd3 = rdd2.map(...).cache() // cache this RDD
   val some_count = rdd3.count() // force the map() to run and
materialize the result

   val rdd4 = rdd3.coalesce(2)
   val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

   rdd3.unpersist()

   This should let the map() run 100 tasks in parallel while giving you
   only 2 output files. You'll get this at the cost of serializing rdd3 to
   memory by running the count().

Nick


On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert 
wrote:

> For the skeptics :), here's a version you can easily reproduce at home:
>
> val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions
> val rdd2 = rdd1.coalesce(100)
> val rdd3 = rdd2 map { _ + 1000 }
> val rdd4 = rdd3.coalesce(2)
> rdd4.collect()
>
> You can see that everything runs as only 2 tasks ... :-/
>
> 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job:
> collect at :48
> 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got
> job 0 (collect at :48) with 2 output partitions (allowLocal=false)
> 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
> Final stage: Stage 0 (collect at :48)
> 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
> Parents of final stage: List()
> 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler:
> Missing parents: List()
> 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler:
> Submitting Stage 0 (CoalescedRDD[11] at coalesce at :45), which
> has no missing parents
> 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler:
> Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at
> :45)
> 2014-06-25 00:43:20,901 INFO org.apache.spark.scheduler.TaskSchedulerImpl:
> Adding task set 0.0 with 2 tasks
> 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager:
> Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal
> (PROCESS_LOCAL)
> 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager:
> Serialized task 0.0:0 as 6632 bytes in 16 ms
> 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager:
> Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal
> (PROCESS_LOCAL)
> 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager:
> Serialized task 0.0:1 as 6632 bytes in 8 ms
> 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
> Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2)
> 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
> Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2)
> 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler:
> Completed ResultTask(0, 0)
> 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.TaskSchedulerImpl:
> Removed TaskSet 0.0, whose tasks have all completed, from pool
> 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler:
> Completed ResultTask(0, 1)
> 2014-06-25 00:43:21,608 INFO org.apache.spark.scheduler.DAGScheduler:
> Stage 0 (collect at :48) finished in 0.693 s
> 2014-06-25 00:43:21,616 INFO org.apache.spark.SparkContext: Job finished:
> collect at :48, took 0.821161249 s
> res7: Array[Int] = Array(1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008,
> 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020,
> 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1051, 1052,
> 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1081, 1082, 1083, 1084,
> 1085, 1086, 1087, 1088, 1089, 1090, 1101, 1102, 1103, 1104, 1105, 1106,
> 1107, 1108, 1109, 1110, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128,
> 1129, 1130, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150,
> 1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1181, 1182,
> 1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1201, 1202, 1203, 1204,
> 1205, 1206, 1207, 1208, 1209, 1210, 1221, 1222, 1223, 1224, 1225, 1226,
> 1227, 1228, 1229, 1230, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248,
> 1249...
>
>
>
>
> On Tue, Jun 24, 2014 at 5:39 PM, Alex Boisvert 
> wrote:
>
>> Yes.
>>
>> scala> rawLogs.partitions.size
>> res1: Int = 2171
>>
>>
>>
>> On Tue, Jun 24, 2014 at 

Re: partitions, coalesce() and parallelism

2014-06-24 Thread Alex Boisvert
For the skeptics :), here's a version you can easily reproduce at home:

val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions
val rdd2 = rdd1.coalesce(100)
val rdd3 = rdd2 map { _ + 1000 }
val rdd4 = rdd3.coalesce(2)
rdd4.collect()

You can see that everything runs as only 2 tasks ... :-/

2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job:
collect at :48
2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got
job 0 (collect at :48) with 2 output partitions (allowLocal=false)
2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Final
stage: Stage 0 (collect at :48)
2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
Parents of final stage: List()
2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler:
Missing parents: List()
2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler:
Submitting Stage 0 (CoalescedRDD[11] at coalesce at :45), which
has no missing parents
2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler:
Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at
:45)
2014-06-25 00:43:20,901 INFO org.apache.spark.scheduler.TaskSchedulerImpl:
Adding task set 0.0 with 2 tasks
2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager:
Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal
(PROCESS_LOCAL)
2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager:
Serialized task 0.0:0 as 6632 bytes in 16 ms
2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager:
Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal
(PROCESS_LOCAL)
2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager:
Serialized task 0.0:1 as 6632 bytes in 8 ms
2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2)
2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2)
2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler:
Completed ResultTask(0, 0)
2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.TaskSchedulerImpl:
Removed TaskSet 0.0, whose tasks have all completed, from pool
2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler:
Completed ResultTask(0, 1)
2014-06-25 00:43:21,608 INFO org.apache.spark.scheduler.DAGScheduler: Stage
0 (collect at :48) finished in 0.693 s
2014-06-25 00:43:21,616 INFO org.apache.spark.SparkContext: Job finished:
collect at :48, took 0.821161249 s
res7: Array[Int] = Array(1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008,
1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020,
1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1051, 1052,
1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1081, 1082, 1083, 1084,
1085, 1086, 1087, 1088, 1089, 1090, 1101, 1102, 1103, 1104, 1105, 1106,
1107, 1108, 1109, 1110, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128,
1129, 1130, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150,
1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1181, 1182,
1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1201, 1202, 1203, 1204,
1205, 1206, 1207, 1208, 1209, 1210, 1221, 1222, 1223, 1224, 1225, 1226,
1227, 1228, 1229, 1230, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248,
1249...




On Tue, Jun 24, 2014 at 5:39 PM, Alex Boisvert 
wrote:

> Yes.
>
> scala> rawLogs.partitions.size
> res1: Int = 2171
>
>
>
> On Tue, Jun 24, 2014 at 4:00 PM, Mayur Rustagi 
> wrote:
>
>> To be clear number of map tasks are determined by number of partitions
>> inside the rdd hence the suggestion by Nicholas.
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> So do you get 2171 as the output for that command? That command tells
>>> you how many partitions your RDD has, so it’s good to first confirm that
>>> rdd1 has as many partitions as you think it has.
>>> ​
>>>
>>>
>>> On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert 
>>> wrote:
>>>
 It's actually a set of 2171 S3 files, with an average size of about
 18MB.


 On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
 nicholas.cham...@gmail.com> wrote:

> What do you get for rdd1._jrdd.splits().size()? You might think
> you’re getting > 100 partitions, but it may not be happening.
> ​
>
>
> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert <
> alex.boisv...@gmail.com> wrote:
>
>> With the following pseudo-code,
>>
>> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
>> val rdd2 = rdd1.coalesce(100)
>> val rdd3 = rdd2 map { ... }
>> val rdd4 = rdd3.coalesce(2)
>> val rdd5 = rdd4.saveAsTe

Re: partitions, coalesce() and parallelism

2014-06-24 Thread Alex Boisvert
Yes.

scala> rawLogs.partitions.size
res1: Int = 2171



On Tue, Jun 24, 2014 at 4:00 PM, Mayur Rustagi 
wrote:

> To be clear number of map tasks are determined by number of partitions
> inside the rdd hence the suggestion by Nicholas.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> So do you get 2171 as the output for that command? That command tells
>> you how many partitions your RDD has, so it’s good to first confirm that
>> rdd1 has as many partitions as you think it has.
>> ​
>>
>>
>> On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert 
>> wrote:
>>
>>> It's actually a set of 2171 S3 files, with an average size of about 18MB.
>>>
>>>
>>> On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 What do you get for rdd1._jrdd.splits().size()? You might think you’re
 getting > 100 partitions, but it may not be happening.
 ​


 On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert >>> > wrote:

> With the following pseudo-code,
>
> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
> val rdd2 = rdd1.coalesce(100)
> val rdd3 = rdd2 map { ... }
> val rdd4 = rdd3.coalesce(2)
> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>
> I would expect the parallelism of the map() operation to be 100
> concurrent tasks, and the parallelism of the save() operation to be 2.
>
> However, it appears the parallelism of the entire chain is 2 -- I only
> see two tasks created for the save() operation and those tasks appear to
> execute the map() operation as well.
>
> Assuming what I'm seeing is as-specified (meaning, how things are
> meant to be), what's the recommended way to force a parallelism of 100 on
> the map() operation?
>
> thanks!
>
>
>

>>>
>>
>


Re: problem about cluster mode of spark 1.0.0

2014-06-24 Thread Gino Bustelo
Andrew,

Thanks for your answer. It validates our finding. Unfortunately, client mode 
assumes that I'm running in a "privilege node". What I mean by privilege is a 
node that has net access to all the workers and vice versa. This is a big 
assumption to make and unreasonable in certain circumstances 

I much rather have a single point of contact like a job server (like ooyala's) 
that handles jar uploading and lifecycles drivers. I think these are basic 
requirement for standalone clusters. 

Gino B.

> On Jun 24, 2014, at 1:32 PM, Andrew Or  wrote:
> 
> Hi Randy and Gino,
> 
> The issue is that standalone-cluster mode is not officially supported. Please 
> use standalone-client mode instead, i.e. specify --deploy-mode client in 
> spark-submit, or simply leave out this config because it defaults to client 
> mode.
> 
> Unfortunately, this is not currently documented anywhere, and the existing 
> explanation for the distinction between cluster and client modes is highly 
> misleading. In general, cluster mode means the driver runs on one of the 
> worker nodes, just like the executors. The corollary is that the output of 
> the application is not forwarded to command that launched the application 
> (spark-submit in this case), but is accessible instead through the worker 
> logs. In contrast, client mode means the command that launches the 
> application also launches the driver, while the executors still run on the 
> worker nodes. This means the spark-submit command also returns the output of 
> the application. For instance, it doesn't make sense to run the Spark shell 
> in cluster mode, because the stdin / stdout / stderr will not be redirected 
> to the spark-submit command.
> 
> If you are hosting your own cluster and can launch applications from within 
> the cluster, then there is little benefit for launching your application in 
> cluster mode, which is primarily intended to cut down the latency between the 
> driver and the executors in the first place. However, if you are still intent 
> on using standalone-cluster mode after all, you can use the deprecated way of 
> launching org.apache.spark.deploy.Client directly through bin/spark-class. 
> Note that this is not recommended and only serves as a temporary workaround 
> until we fix standalone-cluster mode through spark-submit.
> 
> I have filed the relevant issues: 
> https://issues.apache.org/jira/browse/SPARK-2259 and 
> https://issues.apache.org/jira/browse/SPARK-2260. Thanks for pointing this 
> out, and we will get to fixing these shortly.
> 
> Best,
> Andrew
> 
> 
> 2014-06-20 6:06 GMT-07:00 Gino Bustelo :
>> I've found that the jar will be copied to the worker from hdfs fine, but it 
>> is not added to the spark context for you. You have to know that the jar 
>> will end up in the driver's working dir, and so you just add a the file name 
>> if the jar to the context in your program.
>> 
>> In your example below, just add to the context "test.jar".
>> 
>> Btw, the context will not have the master URL either, so add that while you 
>> are at it.
>> 
>> This is a big issue. I've posted about it a week ago and no replies. 
>> Hopefully it gets more attention as more people start hitting this. 
>> Basically, spark-submit on standalone cluster with cluster deploy is broken.
>> 
>> Gino B.
>> 
>> > On Jun 20, 2014, at 2:46 AM, randylu  wrote:
>> >
>> > in addition, jar file can be copied to driver node automatically.
>> >
>> >
>> >
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-cluster-mode-of-spark-1-0-0-tp7982p7984.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 


RE: Basic Scala and Spark questions

2014-06-24 Thread Sameer Tilak
Hi there,Here is how I specify it during the compilation.
scalac -classpath 
/apps/software/abc.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar
 Score.scala
Then I generate a jar file out of it say myapp.
Finally, to run this I do the following:
 ./spark-shell --jars /apps/software/abc.jar,/apps/software/myapp/myapp.jar

Hope this helps.
From: vmuttin...@ebay.com
To: user@spark.apache.org; u...@spark.incubator.apache.org
Subject: RE: Basic Scala and Spark questions
Date: Tue, 24 Jun 2014 20:06:04 +









Hello Tilak,
1. I get error Not found: type RDD error. Can someone please tell me which jars 
do I need to add as external jars and what dhoulf I add iunder import 
statements so that this error will go
 away. 
Do you not see any issues with the import statements?

Add the spark-assembly-1.0.0-hadoop2.2.0.jar file as a dependency.
You can download Spark from here (http://spark.apache.org/downloads.html). 
You’ll find the above mentioned
 jar in the lib folder. 
Import Statement: import org.apache.spark.rdd.RDD



From: Sameer Tilak [mailto:ssti...@live.com]


Sent: Monday, June 23, 2014 10:38 AM

To: u...@spark.incubator.apache.org

Subject: Basic Scala and Spark questions


 

Hi All,
I am new so Scala and Spark. I have a basic question. I have the following 
import statements in my Scala program. I want to pass my function (printScore)
 to Spark. It will compare a string 
 
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/* import thirdparty jars */
  
I have the following method in my Scala class:
 
class DistanceClass
{
val ta = new textAnalytics();
 
def printScore(sourceStr: String, rdd:
RDD[String]) 
{
 
// Third party jars have StringWrapper
val
str1 =
new StringWrapper (sourceStr)
val ta_ = this.ta;
 
rdd.map(str1, x => ta_.score(str1, StringWrapper(x))
   
}
 
I am using Eclipse for development. I have the following questions:
1. I get error Not found: type RDD error. Can someone please tell me which jars 
do I need to add as external jars and what dhoulf I add iunder import 
statements so that this error will go
 away. 
2. Also, including StringWrapper(x) inside map, will that be OK? rdd.map(str1, 
x => ta_.score(str1, StringWrapper(x))

 


  

Re: Spark slave fail to start with wierd error information

2014-06-24 Thread Peng Cheng
anyone encounter this situation?

Also, I'm very sure my slave and master are in the same security group, with
port 7077 opened



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203p8227.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: DAGScheduler: Failed to run foreach

2014-06-24 Thread Sameer Tilak
Dear Aaron,Thanks for your help. I am still facing few problems. 
I am using a 3rd party library (jar file) under the hood when I call 
jc_->score. Each call to jc_->score will generate a array of doubles. It is 
basically score of the current sentence with every sentence in the destrdd 
generated by loading a file from hdfs. My goal is to then save it in a file and 
repeat it for every sentence/string from sourcerdd. 

  def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String])  {  val jc_ 
= this.mjc  var i : Int = 0
  for (sentence <- sourcerdd.toLocalIterator)   {val str1 = new 
StringWrapper (sentence)var scorevector = destrdd.map(x => 
jc_.score(str1, new BasicStringWrapper(x)))val fileName = new 
String("/apps/software/scala-approsstrmatch-sentence" + i)
scorevector.saveAsTextFile(fileName)   i += 1   }
  }I get this error message. I am not sure why var i : Int = 0 will throw an 
error?
java.lang.NoSuchMethodError: 
scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;   at 
approxstrmatch.JaccardScore.calculateScoreSecond(JaccardScore.scala:77)  at 
$iwC$$iwC$$iwC$$iwC.(:19) at 
$iwC$$iwC$$iwC.(:24)  at $iwC$$iwC.(:26)   at 
$iwC.(:28)at (:30) at .(:34)   
 at .() at .(:7) at .() at 
$print()at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601) at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)   at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)   at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)   at 
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)  at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
   at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) 
 at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) 
 at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)   
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)   at 
org.apache.spark.repl.Main$.main(Main.scala:31)  at 
org.apache.spark.repl.Main.main(Main.scala)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601) at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)   at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)  at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
If I get rid of i from the above code, I still get the following error. I 
incorporated your feedback and there is no call to any method as a part of 
closure now. 
Error message:
14/06/24 11:24:57 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at 
saveAsTextFile at JaccardScore.scala:64), which has no missing parents14/06/24 
11:24:57 INFO DAGScheduler: Failed to run saveAsTextFile at 
JaccardScore.scala:64org.apache.spark.SparkException: Job aborted due to stage 
failure: Task not serializable: java.io.NotSerializableException: 
com.wcohen.ss.Jaccard  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)   at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
 at 
org.apach

Re: ElasticSearch enrich

2014-06-24 Thread Holden Karau
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
but for now if you want to see a simple demo which uses elasticsearch for
geo input you can take a look at my quick & dirty implementation with
TopTweetsInALocation (
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
). This approach uses the ESInputFormat which avoids the difficulty of
having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a
query for each record in your RDD. If this is the case, you could instead
look at using mapPartitions and setting up your Elasticsearch connection
inside of that, so you could then re-use the client for all of the queries
on each partition. This approach will avoid having to serialize the
Elasticsearch connection because it will be local to your function.

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
wrote:

> Its not used as default serializer for some issues with compatibility &
> requirement to register the classes..
>
> Which part are you getting as nonserializable... you need to serialize
> that class if you are sending it to spark workers inside a map, reduce ,
> mappartition or any of the operations on RDD.
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng  wrote:
>
>> I'm afraid persisting connection across two tasks is a dangerous act as
>> they
>> can't be guaranteed to be executed on the same machine. Your ES server may
>> think its a man-in-the-middle attack!
>>
>> I think its possible to invoke a static method that give you a connection
>> in
>> a local 'pool', so nothing will sneak into your closure, but its too
>> complex
>> and there should be a better option.
>>
>> Never use kryo before, if its that good perhaps we should use it as the
>> default serializer
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


-- 
Cell : 425-233-8271


Re: Upgrading to Spark 1.0.0 causes NoSuchMethodError

2014-06-24 Thread Robert James
On 6/24/14, Peng Cheng  wrote:
> I got 'NoSuchFieldError' which is of the same type. its definitely a
> dependency jar conflict. spark driver will load jars of itself which in
> recent version get many dependencies that are 1-2 years old. And if your
> newer version dependency is in the same package it will be shaded (Java's
> first come first serve principle) and the new method won't be found. Try
> using:
>
> mvn dependency:tree to find duplicate artifacts
>
> and use maven-shade-plugin to rename the package of your newer library.
> (IntelliJ doesn't officially support this plug-in so it may become quirky,
> if that happens try re-importing the project)
>

I'm using Scala and sbt. How can I do what you recommend (no maven)?

 I tried doing a `sbt clean` but it didn't help.


Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Its not used as default serializer for some issues with compatibility &
requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that
class if you are sending it to spark workers inside a map, reduce ,
mappartition or any of the operations on RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng  wrote:

> I'm afraid persisting connection across two tasks is a dangerous act as
> they
> can't be guaranteed to be executed on the same machine. Your ES server may
> think its a man-in-the-middle attack!
>
> I think its possible to invoke a static method that give you a connection
> in
> a local 'pool', so nothing will sneak into your closure, but its too
> complex
> and there should be a better option.
>
> Never use kryo before, if its that good perhaps we should use it as the
> default serializer
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread boci
I using elastic4s inside my ESWorker class. ESWorker now only contain two
field, host:String, port:Int. Now Inside the "findNearestCity" method I
create ElasticClient (elastic4s) connection. What's wrong with my class? I
need to serialize ElasticClient? mappartition is sounds good but I still
got NotSerializableException, or I must mar kit to transient? and where
come the host and port in this case?

my worker:

class ESWorker(val host: String, val port: Int) {
  def findNearestCity(geo: Position): Option[City] = {
 //Here I create ElasticClient connection and execute queries
  }
  def enrichGeoInternal(data:Data):Data = {
 data.location=findNearestCity(data.position)
  }
  def enrichGeo(ds: DStream[Data]): DStream[Data] = {
 ds.map(enrichGeoInternal)
  }
}



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:03 AM, Mayur Rustagi 
wrote:

> Mostly ES client is not serializable for you. You can do 3 workarounds,
> 1. Switch to kryo serialization, register the client in kryo , might solve
> your serialization issue
> 2. Use mappartition for all your data & initialize your client in the
> mappartition code, this will create client for each partition, reduce some
> parallelism & add some overhead of creation of client but prevent
> serialization of esclient & transfer to workers
> 3. Use serializablewrapper to serialize your ESclient manually & send it
> across & deserialize it manually, this may or may not work depending on
> whether your class is safely serializable.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Wed, Jun 25, 2014 at 4:12 AM, boci  wrote:
>
>> Hi guys,
>>
>> I have a small question. I want to create a "Worker" class which using
>> ElasticClient to make query to elasticsearch. (I want to enrich my data
>> with geo search result).
>>
>> How can I do that? I try to create a worker instance with ES host/port
>> parameter but spark throw an exceptino (my class not serializable).
>>
>> Any idea?
>>
>> Thanks
>> b0c1
>>
>>
>


Re: Upgrading to Spark 1.0.0 causes NoSuchMethodError

2014-06-24 Thread Peng Cheng
I got 'NoSuchFieldError' which is of the same type. its definitely a
dependency jar conflict. spark driver will load jars of itself which in
recent version get many dependencies that are 1-2 years old. And if your
newer version dependency is in the same package it will be shaded (Java's
first come first serve principle) and the new method won't be found. Try
using:

mvn dependency:tree to find duplicate artifacts

and use maven-shade-plugin to rename the package of your newer library.
(IntelliJ doesn't officially support this plug-in so it may become quirky,
if that happens try re-importing the project)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Upgrading-to-Spark-1-0-0-causes-NoSuchMethodError-tp8207p8220.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to Reload Spark Configuration Files

2014-06-24 Thread Peng Cheng
I've read somewhere that in 1.0 there is a bash tool called 'spark-config.sh'
that allows you to propagate your config files to a number of master and
slave nodes. However I haven't use it myself



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Reload-Spark-Configuration-Files-tp8159p8219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Mostly ES client is not serializable for you. You can do 3 workarounds,
1. Switch to kryo serialization, register the client in kryo , might solve
your serialization issue
2. Use mappartition for all your data & initialize your client in the
mappartition code, this will create client for each partition, reduce some
parallelism & add some overhead of creation of client but prevent
serialization of esclient & transfer to workers
3. Use serializablewrapper to serialize your ESclient manually & send it
across & deserialize it manually, this may or may not work depending on
whether your class is safely serializable.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Wed, Jun 25, 2014 at 4:12 AM, boci  wrote:

> Hi guys,
>
> I have a small question. I want to create a "Worker" class which using
> ElasticClient to make query to elasticsearch. (I want to enrich my data
> with geo search result).
>
> How can I do that? I try to create a worker instance with ES host/port
> parameter but spark throw an exceptino (my class not serializable).
>
> Any idea?
>
> Thanks
> b0c1
>
>


Re: ElasticSearch enrich

2014-06-24 Thread boci
Ok but in this case where can I store the ES connection? Or all document
create new ES connection inside the worker?

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:01 AM, Peng Cheng  wrote:

> make sure all queries are called through class methods and wrap your query
> info with a class having only simple properties (strings, collections etc).
> If you can't find such wrapper you can also use SerializableWritable
> wrapper
> out-of-the-box, but its not recommended. (developer-api and make fat
> closures that run slowly)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: partitions, coalesce() and parallelism

2014-06-24 Thread Mayur Rustagi
To be clear number of map tasks are determined by number of partitions
inside the rdd hence the suggestion by Nicholas.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> So do you get 2171 as the output for that command? That command tells you
> how many partitions your RDD has, so it’s good to first confirm that rdd1
> has as many partitions as you think it has.
> ​
>
>
> On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert 
> wrote:
>
>> It's actually a set of 2171 S3 files, with an average size of about 18MB.
>>
>>
>> On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> What do you get for rdd1._jrdd.splits().size()? You might think you’re
>>> getting > 100 partitions, but it may not be happening.
>>> ​
>>>
>>>
>>> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert 
>>> wrote:
>>>
 With the following pseudo-code,

 val rdd1 = sc.sequenceFile(...) // has > 100 partitions
 val rdd2 = rdd1.coalesce(100)
 val rdd3 = rdd2 map { ... }
 val rdd4 = rdd3.coalesce(2)
 val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

 I would expect the parallelism of the map() operation to be 100
 concurrent tasks, and the parallelism of the save() operation to be 2.

 However, it appears the parallelism of the entire chain is 2 -- I only
 see two tasks created for the save() operation and those tasks appear to
 execute the map() operation as well.

 Assuming what I'm seeing is as-specified (meaning, how things are meant
 to be), what's the recommended way to force a parallelism of 100 on the
 map() operation?

 thanks!



>>>
>>
>


Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
make sure all queries are called through class methods and wrap your query
info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper
out-of-the-box, but its not recommended. (developer-api and make fat
closures that run slowly)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx SubGraph

2014-06-24 Thread Ankur Dave
Yes, the subgraph operator takes a vertex predicate and keeps only the
edges where both vertices satisfy the predicate, so it will work as long as
you can express the sublist in terms of a vertex predicate.

If that's not possible, you can still obtain the same effect, but you'll
have to use lower-level operations similar to how subgraph is itself
implemented. I can help out if that's the case.

Ankur 


Re: partitions, coalesce() and parallelism

2014-06-24 Thread Nicholas Chammas
So do you get 2171 as the output for that command? That command tells you
how many partitions your RDD has, so it’s good to first confirm that rdd1
has as many partitions as you think it has.
​


On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert 
wrote:

> It's actually a set of 2171 S3 files, with an average size of about 18MB.
>
>
> On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> What do you get for rdd1._jrdd.splits().size()? You might think you’re
>> getting > 100 partitions, but it may not be happening.
>> ​
>>
>>
>> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert 
>> wrote:
>>
>>> With the following pseudo-code,
>>>
>>> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
>>> val rdd2 = rdd1.coalesce(100)
>>> val rdd3 = rdd2 map { ... }
>>> val rdd4 = rdd3.coalesce(2)
>>> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>>>
>>> I would expect the parallelism of the map() operation to be 100
>>> concurrent tasks, and the parallelism of the save() operation to be 2.
>>>
>>> However, it appears the parallelism of the entire chain is 2 -- I only
>>> see two tasks created for the save() operation and those tasks appear to
>>> execute the map() operation as well.
>>>
>>> Assuming what I'm seeing is as-specified (meaning, how things are meant
>>> to be), what's the recommended way to force a parallelism of 100 on the
>>> map() operation?
>>>
>>> thanks!
>>>
>>>
>>>
>>
>


Re: Questions regarding different spark pre-built packages

2014-06-24 Thread Mayur Rustagi
HDFS driver keeps changing & breaking compatibility, hence all the build
versions. If you dont use HDFS/YARN then you can safely ignore it.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Jun 24, 2014 at 12:16 PM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> Hi,
>
> I am just curious to know what are the difference between the prebuilt
> packages for Hadoop1, 2, CDH etc.
>
> I am using spark standalone cluster and we dont use hadoop at all.
>
> Can we use any one of the pre-buil;t packages OR we have to run
> make-distribution.sh script from the code?
>
> Thanks,
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>


Re: How data is distributed while processing in spark cluster?

2014-06-24 Thread Mayur Rustagi
Using HDFS locality. The workers call for the data from hdfs/queue etc.
Unless you use parallelize then its sent from driver (typically on the
master) to the worker nodes.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Jun 24, 2014 at 11:51 AM, srujana 
wrote:

> Hi,
>
> I am working on auto scaling spark cluster. I would like to know how master
> distributes the data to the slaves for processing in detail.
>
> Any information on this would be helpful.
>
> Thanks,
> Srujana
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-data-is-distributed-while-processing-in-spark-cluster-tp8160.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: How to Reload Spark Configuration Files

2014-06-24 Thread Mayur Rustagi
Not really. You are better off using a cluster manager like Mesos or Yarn
for this.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Jun 24, 2014 at 11:35 AM, Sirisha Devineni <
sirisha_devin...@persistent.co.in> wrote:

>  Hi All,
>
>
>
> I am working with Spark to add new slaves automatically when there is more
> data to be processed by the cluster. During this process there is question
> arisen, after adding/removing new slave node to/from the spark cluster do
> we need to restart master and other existing slaves in the cluster?
>
>
>
> From my observations:
>
> 1.   If a new slave node details are added in configuration
> files(/root/spark/conf/salves) on master node , running “start-slaves.sh”
> script will add the new slave to cluster without affecting  existing slaves
> or master.
>
> 2.   If a slave details are removed from the configuration file, one
> need to restart master using stop-master.sh and start-master.sh scripts to
> take effect.
>
>
>
> Is there any reload option available in Spark to load the changed
> configuration files without stopping the services. Here stopping the
> service of master or existing salves may lead to outage of services.
>
> You can find the options available to start/stop the services of spark at
> http://spark.apache.org/docs/latest/spark-standalone.html
>
>
>
>
>
> Thanks & Regards,
>
> Sirisha Devineni.
>
> DISCLAIMER == This e-mail may contain privileged and confidential
> information which is the property of Persistent Systems Ltd. It is intended
> only for the use of the individual or entity to which it is addressed. If
> you are not the intended recipient, you are not authorized to read, retain,
> copy, print, distribute or use this message. If you have received this
> communication in error, please notify the sender and delete all copies of
> this message. Persistent Systems Ltd. does not accept any liability for
> virus infected mails.
>


ElasticSearch enrich

2014-06-24 Thread boci
Hi guys,

I have a small question. I want to create a "Worker" class which using
ElasticClient to make query to elasticsearch. (I want to enrich my data
with geo search result).

How can I do that? I try to create a worker instance with ES host/port
parameter but spark throw an exceptino (my class not serializable).

Any idea?

Thanks
b0c1


Re: balancing RDDs

2014-06-24 Thread Mayur Rustagi
This would be really useful. Especially for Shark where shift of
partitioning effects all subsequent queries unless task scheduling time
beats spark.locality.wait. Can cause overall low performance for all
subsequent tasks.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara 
wrote:

> We have a use case where we’d like something to execute once on each node
> and I thought it would be good to ask here.
>
> Currently we achieve this by setting the parallelism to the number of
> nodes and use a mod partitioner:
>
> val balancedRdd = sc.parallelize(
> (0 until Settings.parallelism)
> .map(id => id -> Settings.settings)
>   ).partitionBy(new ModPartitioner(Settings.parallelism))
>   .cache()
>
>
> This works great except in two instances where it can become unbalanced:
>
> 1. if a worker is restarted or dies, the partition will move to a
> different node (one of the nodes will run two tasks).  When the worker
> rejoins, is there a way to have a partition move back over to the newly
> restarted worker so that it’s balanced again?
>
> 2. drivers need to be started in a staggered fashion, otherwise one driver
> can launch two tasks on one set of workers, and the other driver will do
> the same with the other set.  Are there any scheduler/config semantics so
> that each driver will take one (and only one) core from *each* node?
>
>
> Thanks
>
> Sean
>
>
>
>
>
>
>


Upgrading to Spark 1.0.0 causes NoSuchMethodError

2014-06-24 Thread Robert James
My app works fine under Spark 0.9.  I just tried upgrading to Spark
1.0, by downloading the Spark distro to a dir, changing the sbt file,
and running sbt assembly, but I get now NoSuchMethodErrors when trying
to use spark-submit.

I copied in the SimpleApp example from
http://spark.apache.org/docs/latest/quick-start.html and get the same
error:

$/usr/local/share/spark/bin/spark-submit --class SimpleApp
target/scala-2.10/myproj-assembly-1.0.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.SparkContext$.$lessinit$greater$default$2()Lscala/collection/Map;
at SimpleApp$.main(SimpleApp.scala:10)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

How can I migrate to Spark 1.0.0?


Re: Efficiently doing an analysis with Cartesian product (pyspark)

2014-06-24 Thread Mayur Rustagi
How about this..
map it to key,value pair, then reducebykey using max operation
Then in the rdd you can do join with your lookup data & reduce (if you only
wanna lookup 2 values then you canuse lookup directly as well).
PS: these are list of operations in Scala, I am not aware how far pyspark
api is in those.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Jun 24, 2014 at 3:33 AM, Aaron  wrote:

> Sorry, I got my sample outputs wrong
>
>  (1,1) -> 400
> (1,2) -> 500
> (2,2)-> 600
>
> On Jun 23, 2014, at 4:29 PM, "Aaron Dossett [via Apache Spark User List]" 
> <[hidden
> email] > wrote:
>
>  I am relatively new to Spark and am getting stuck trying to do the
> following:
>
> - My input is integer key, value pairs where the key is not unique.  I'm
> interested in information about all possible distinct key combinations,
> thus the Cartesian product.
> - My first attempt was to create a separate RDD of this cartesian product
> and then use map() to calculate the data.  However, I was trying to pass
> another RDD to the function map was calling, which I eventually figured out
> was causing a run time error, even if the function I called with map did
> nothing.  Here's a simple code example:
>
> ---
> def somefunc(x, y, RDD):
>   return 0
>
> input = sc.parallelize([(1,100), (1,200), (2, 100), (2,300)])
>
> #Create all pairs of keys, including self-pairs
> itemPairs = input.map(lambda x: x[0]).distinct()
> itemPairs = itemPairs.cartesian(itemPairs)
>
> print itemPairs.collect()
>
> TC = itemPairs.map(lambda x: (x, somefunc(x[0], x[1], input)))
>
> print TC.collect()
> --
>
> I'm assuming this isn't working because it isn't a very Spark-like way to
> do things and I could imagine that passing RDDs into other RDD's map
> functions might not make sense.  Could someone suggest to me a way to apply
> transformations and actions to "input" that would produce a mapping of key
> pairs to some information related to the values.
>
> For example, I might want to (1, 2) to map to the sum of the maximum
> values found for each key in the input (500 in my sample data above).
>  Extending that example (1,1) would map to 300 and (2,2) to 400.
>
> Please let me know if I should provide more details or a more robust
> example.
>
> Thank you, Aaron
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Efficiently-doing-an-analysis-with-Cartesian-product-pyspark-tp8144.html
>  This email was sent by Aaron Dossett
> 
> (via Nabble)
> To receive all replies by email, subscribe to this discussion
> 
>
>
> --
> View this message in context: Re: Efficiently doing an analysis with
> Cartesian product (pyspark)
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Serialization problem in Spark

2014-06-24 Thread Peng Cheng
I encounter the same problem with hadoop.fs.Configuration (very complex,
unserializable class)
basically if your closure contains any instance (not constant
object/singleton! they are in the jar, not closure) that doesn't inherit
Serializable, or their properties doesn't inherit Serializable, you are
going to have this error.
My solution is wrap your thing with SerializableWritable, and use that in
your function closure. If its going to be heavily reused, wrap that thingy
again with sc.broadcast. If you read spark source code, you will find a lot
of instances like this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-problem-in-Spark-tp7049p8205.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark slave fail to start with wierd error information

2014-06-24 Thread Peng Cheng
I haven't setup a passwordless login from slave to master node yet (I was
under impression that this is not necessary since they communicate using
port 7077)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203p8204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark slave fail to start with wierd error information

2014-06-24 Thread Peng Cheng
I'm trying to link a spark slave with an already-setup master, using:

$SPARK_HOME/sbin/start-slave.sh spark://ip-172-31-32-12:7077

However the result shows that it cannot open a log file it is supposed to
create:

failed to launch org.apache.spark.deploy.worker.Worker:
tail: cannot open
'/opt/spark/spark-1.0.0-bin-hadoop1/sbin/../logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-spark://ip-172-31-32-12:7077-ip-172-31-36-80.out'
for reading: No such file or directory
full log in
/opt/spark/spark-1.0.0-bin-hadoop1/sbin/../logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-spark://ip-172-31-32-12:7077-ip-172-31-36-80.out
(ignore this line as the log file is not there)

What happened here?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Serialization problem in Spark

2014-06-24 Thread Mayur Rustagi
did you try to register the class in Kryo serializer?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Mon, Jun 23, 2014 at 7:00 PM, rrussell25  wrote:

> Thanks for pointer...tried Kryo and ran into a strange error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Exception
> while deserializing and fetching task:
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> rg.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> It is strange in that the complaint is for "rg.apache..."   (missing o is
> not a typo).
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-problem-in-Spark-tp7049p8123.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Problems running Spark job on mesos in fine-grained mode

2014-06-24 Thread Mayur Rustagi
Hi Sebastien,
Are you using Pyspark by any chance, is that working for you (post the
patch?)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Mon, Jun 23, 2014 at 1:51 PM, Fedechicco  wrote:

> I'm getting the same behavior and it's crucial I get it fixed for an
> evaluation of Spark + Mesos within my company.
>
> I'm bumping +1 for the request of putting this fix in the 1.0.1 if
> possible!
>
> thanks,
> Federico
>
>
> 2014-06-20 20:51 GMT+02:00 Sébastien Rainville <
> sebastienrainvi...@gmail.com>:
>
> Hi,
>>
>> this is just a follow-up regarding this issue. Turns out that it's caused
>> by a bug in Spark. I created a case for it:
>> https://issues.apache.org/jira/browse/SPARK-2204 and submitted a patch.
>>
>> Any chance this could be included in the 1.0.1 release?
>>
>> Thanks,
>>
>> - Sebastien
>>
>>
>>
>> On Tue, Jun 17, 2014 at 2:57 PM, Sébastien Rainville <
>> sebastienrainvi...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm having trouble running spark on mesos in fine-grained mode. I'm
>>> running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which
>>> most of the time, but not always, cause the job to fail. The same code is
>>> running fine in coarse-grained mode. I see the following exceptions in the
>>> logs of the spark driver:
>>>
>>> W0617 10:57:36.774382  8735 sched.cpp:901] Attempting to launch task 21
>>> with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
>>> W0617 10:57:36.774433  8735 sched.cpp:901] Attempting to launch task 22
>>> with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
>>> 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for
>>> 201311011608-1369465866-5050-9189-46 from TaskSet 0.0
>>> 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2)
>>> 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0)
>>> 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1)
>>> 14/06/17 10:57:36 INFO DAGScheduler: Executor lost:
>>> 201311011608-1369465866-5050-9189-46 (epoch 0)
>>> 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove
>>> executor 201311011608-1369465866-5050-9189-46 from BlockManagerMaster.
>>> 14/06/17 10:57:36 INFO BlockManagerMaster: Removed
>>> 201311011608-1369465866-5050-9189-46 successfully in removeExecutor
>>> 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1
>>> 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list
>>> earlier: ca1-dcc1-0065.lab.mtl
>>>
>>> I don't see any exceptions in the spark executor logs. The only error
>>> message I found in mesos itself is warnings in the mesos master:
>>>
>>> W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21
>>> : Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1;
>>> mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
>>> ports(*):[11900-11919, 1192
>>> 1-11995, 11997-11999]; cpus(*):1
>>> W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22
>>> : Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1;
>>> mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
>>> ports(*):[11900-11919, 1192
>>> 1-11995, 11997-11999]; cpus(*):1
>>> W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28
>>> : Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1;
>>> mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304;
>>> ports(*):[11900-
>>> 11960, 11962-11978, 11980-11999]
>>> W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited
>>> executor 201311011608-1369465866-5050-9189-46 on slave
>>> 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl)
>>> W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited
>>> executor 201311011608-1369465866-5050-9189-34 on slave
>>> 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl)
>>> W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited
>>> executor 201311011608-1369465866-5050-9189-59 on slave
>>> 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl)
>>> W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited
>>> executor 201311011608-1369465866-5050-9189-18 on slave
>>> 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl)
>>> ... (more of those "Ignoring unknown exited executor")
>>>
>>>
>>> I analyzed the difference in between the execution of the same job in
>>> coarse-grained mode and fine-grained mode, and I noticed that in the
>>> fine-grained mode the tasks get executed on executors different than the
>>> ones reported in spark, as if spark and mesos get out of sync as to which
>>> executor is responsible for which task. See the following:
>>>
>>>
>>> Coarse-grained mode:
>>>
>>>  Spark Mesos Task IndexTask ID ExecutorStatusTask ID (UI)Task Name Task
>>> ID (logs)ExecutorState 0066SUCCESS 4"Task 4"0 66RUNNING1 159SUCCESS0 "Task
>>> 0"159 RUNNING22 54SUCCE

Re: Kafka Streaming - Error Could not compute split

2014-06-24 Thread Mayur Rustagi
I have seen this when I prevent spilling of shuffle data on disk. Can you
change shuffle memory fraction. Is your data spilling to disk?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Mon, Jun 23, 2014 at 12:09 PM, Kanwaldeep  wrote:

> We are using Spark 1.0.0 deployed on Spark Standalone cluster and I'm
> getting
> the following exception. With previous version I've seen this error occur
> along with OutOfMemory errors which I'm not seeing with Sparks 1.0.
>
> Any suggestions?
>
> Job aborted due to stage failure: Task 3748.0:20 failed 4 times, most
> recent
> failure: Exception failure in TID 225792 on host
> hslave32106.sjc9.service-now.com: java.lang.Exception: Could not compute
> split, block input-0-1403458929600 not found
> org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> org.apache.spark.scheduler.Task.run(Task.scala:51)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> java.lang.Thread.run(Thread.java:662) Driver stacktrace:
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-Error-Could-not-compute-split-tp8112.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Persistent Local Node variables

2014-06-24 Thread Mayur Rustagi
Are you trying to process data as part of the same Job(till same spark
context), then all you have to do is cache the output rdd of your
processing. It'll run your processing once & cache the results for future
tasks, unless your node caching the rdd goes down.
if you are trying to retain it for quite a long time you can

   - Simplistically store it as hdfs & load it each time
   - Either store that in a table & try to pull it with sparksql every
   time(experimental).
   - Use Ooyala Jobserver to cache the data & do all processing using that.

Regards
Mayur


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Mon, Jun 23, 2014 at 11:14 AM, Daedalus 
wrote:

> Will using mapPartitions and creating a new RDD of ParsedData objects avoid
> multiple parsing?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Persistent-Local-Node-variables-tp8104p8107.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


JavaRDD.mapToPair throws NPE

2014-06-24 Thread Mingyu Kim
Hi all,

I¹m trying to use JavaRDD.mapToPair(), but it fails with NPE on the
executor. The PairFunction used in the call is null for some reason. Any
comments/help would be appreciated!

My setup is,
* Java 7
* Spark 1.0.0
* Hadoop 2.0.0-mr1-cdh4.6.0
Here¹s the code snippet.

> import org.apache.spark.SparkConf;
> 
> import org.apache.spark.api.java.JavaPairRDD;
> 
> import org.apache.spark.api.java.JavaRDD;
> 
> import org.apache.spark.api.java.JavaSparkContext;
> 
> import org.apache.spark.api.java.function.PairFunction;
> 
> 
> 
> import scala.Tuple2;
> 
> 
> 
> public class Test {
> 
> public static void main(String[] args) {
> 
> SparkConf conf = new SparkConf()
> 
> .setMaster("spark://mymaster")
> 
> .setAppName("MyApp")
> 
> .setSparkHome("/my/spark/home");
> 
> 
> 
> JavaSparkContext sc = new JavaSparkContext(conf);
> 
> sc.addJar("/path/to/jar"); // ship the jar of this class
> 
> JavaRDD rdd = sc.textFile("/path/to/nums.csv²); // nums.csv
> simply has one integer per line
> 
> JavaPairRDD pairRdd = rdd.mapToPair(new
> MyPairFunction());
> 
> 
> 
> System.out.println(pairRdd.collect());
> 
> }
> 
> 
> 
> private static final class MyPairFunction implements PairFunction Integer, Integer> {
> 
> private static final long serialVersionUID = 1L;
> 
> 
> 
> @Override
> 
> public Tuple2 call(String s) throws Exception {
> 
> return new Tuple2(Integer.parseInt(s),
> Integer.parseInt(s));
> 
> }
> 
> }
> 
> }
> 
> 
Here¹s the stack trace.
> 
> Exception in thread "main" 14/06/24 14:39:01 INFO scheduler.TaskSchedulerImpl:
> Removed TaskSet 0.0, whose tasks have all completed, from pool
> 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0
> failed 4 times, most recent failure: Exception failure in TID 6 on host
> 10.160.24.216: java.lang.NullPointerException
> 
> 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaP
> airRDD.scala:750)
> 
> 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaP
> airRDD.scala:750)
> 
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> 
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 
> 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> 
> 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> 
> 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> 
> scala.collection.AbstractIterator.to(Iterator.scala:1157)
> 
> 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> 
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> 
> 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> 
> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> 
> org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
> 
> org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
> 
> 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> 
> 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> 
> org.apache.spark.scheduler.Task.run(Task.scala:51)
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> 
> 
> 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145>
)
> 
> 
> 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615>
)
> 
> java.lang.Thread.run(Thread.java:722)
> 
> Driver stacktrace:
> 
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedule
> r$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul
> er.scala:1017)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul
> er.scala:1015)
> 
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(D
> AGScheduler.scala:633)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(D
> AGScheduler.scala:633)
> 
> at scala.Option.foreach(Option.scala:236)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala
> :633)
> 
> at 
>

Graphx SubGraph

2014-06-24 Thread aymanshalaby
Hi guys,

I am a newbie with Spark/Graphx. We are considering using Graphx in
production.
Our 1st use case is: given a sublist of vertices in the graph, we want to
return the induced edges between the vertices of this sublist.

Please correct me if I am wrong. Is that what does the subgraph function do?

Thanks,
Ayman :) 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-SubGraph-tp8197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark switch to debug loglevel

2014-06-24 Thread Philip Limbeck
Hi!

According to
https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging,
changing log-level is just a matter of creating a log4j.properties (which
is in the classpath of spark) and changing log level there for the root
logger. I did this steps on every node in the cluster (master and worker
nodes). However, after restart there is still no debug output as desired,
but only the default info log level.


Re: partitions, coalesce() and parallelism

2014-06-24 Thread Alex Boisvert
It's actually a set of 2171 S3 files, with an average size of about 18MB.


On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> What do you get for rdd1._jrdd.splits().size()? You might think you’re
> getting > 100 partitions, but it may not be happening.
> ​
>
>
> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert 
> wrote:
>
>> With the following pseudo-code,
>>
>> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
>> val rdd2 = rdd1.coalesce(100)
>> val rdd3 = rdd2 map { ... }
>> val rdd4 = rdd3.coalesce(2)
>> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>>
>> I would expect the parallelism of the map() operation to be 100
>> concurrent tasks, and the parallelism of the save() operation to be 2.
>>
>> However, it appears the parallelism of the entire chain is 2 -- I only
>> see two tasks created for the save() operation and those tasks appear to
>> execute the map() operation as well.
>>
>> Assuming what I'm seeing is as-specified (meaning, how things are meant
>> to be), what's the recommended way to force a parallelism of 100 on the
>> map() operation?
>>
>> thanks!
>>
>>
>>
>


Re: partitions, coalesce() and parallelism

2014-06-24 Thread Nicholas Chammas
What do you get for rdd1._jrdd.splits().size()? You might think you’re
getting > 100 partitions, but it may not be happening.
​


On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert 
wrote:

> With the following pseudo-code,
>
> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
> val rdd2 = rdd1.coalesce(100)
> val rdd3 = rdd2 map { ... }
> val rdd4 = rdd3.coalesce(2)
> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>
> I would expect the parallelism of the map() operation to be 100 concurrent
> tasks, and the parallelism of the save() operation to be 2.
>
> However, it appears the parallelism of the entire chain is 2 -- I only see
> two tasks created for the save() operation and those tasks appear to
> execute the map() operation as well.
>
> Assuming what I'm seeing is as-specified (meaning, how things are meant to
> be), what's the recommended way to force a parallelism of 100 on the map()
> operation?
>
> thanks!
>
>
>


RE: Basic Scala and Spark questions

2014-06-24 Thread Muttineni, Vinay
Hello Tilak,
1. I get error Not found: type RDD error. Can someone please tell me which jars 
do I need to add as external jars and what dhoulf I add iunder import 
statements so that this error will go away.
Do you not see any issues with the import statements?
Add the spark-assembly-1.0.0-hadoop2.2.0.jar file as a dependency.
You can download Spark from here (http://spark.apache.org/downloads.html). 
You'll find the above mentioned jar in the lib folder.
Import Statement: import org.apache.spark.rdd.RDD
From: Sameer Tilak [mailto:ssti...@live.com]
Sent: Monday, June 23, 2014 10:38 AM
To: u...@spark.incubator.apache.org
Subject: Basic Scala and Spark questions

Hi All,
I am new so Scala and Spark. I have a basic question. I have the following 
import statements in my Scala program. I want to pass my function (printScore) 
to Spark. It will compare a string

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/* import thirdparty jars */

I have the following method in my Scala class:

class DistanceClass
{
val ta = new textAnalytics();

def printScore(sourceStr: String, rdd: RDD[String])
{

// Third party jars have StringWrapper
val str1 = new StringWrapper (sourceStr)
val ta_ = this.ta;

rdd.map(str1, x => ta_.score(str1, StringWrapper(x))

}

I am using Eclipse for development. I have the following questions:
1. I get error Not found: type RDD error. Can someone please tell me which jars 
do I need to add as external jars and what dhoulf I add iunder import 
statements so that this error will go away.
2. Also, including StringWrapper(x) inside map, will that be OK? rdd.map(str1, 
x => ta_.score(str1, StringWrapper(x))



partitions, coalesce() and parallelism

2014-06-24 Thread Alex Boisvert
With the following pseudo-code,

val rdd1 = sc.sequenceFile(...) // has > 100 partitions
val rdd2 = rdd1.coalesce(100)
val rdd3 = rdd2 map { ... }
val rdd4 = rdd3.coalesce(2)
val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

I would expect the parallelism of the map() operation to be 100 concurrent
tasks, and the parallelism of the save() operation to be 2.

However, it appears the parallelism of the entire chain is 2 -- I only see
two tasks created for the save() operation and those tasks appear to
execute the map() operation as well.

Assuming what I'm seeing is as-specified (meaning, how things are meant to
be), what's the recommended way to force a parallelism of 100 on the map()
operation?

thanks!


Re: How to use K-fold validation in spark-1.0?

2014-06-24 Thread holdingonrobin
Thanks Evan! I think it works!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-K-fold-validation-in-spark-1-0-tp8142p8188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


LinearRegression giving different weights in Spark 1.0 and Spark 0.9

2014-06-24 Thread fintis
Hi,

Maybe it is I doing something wrong but I suspect the linear regression is
behaving differently in Spark 1.0 as compared to Spark 0.9.

I have the following data points:

23 9515 7 2.58 113 0.77 0.964 9.5 9
22 9830 8 1.15 126 0.38 0.964 9.5 9
14 10130 9 0.81 129 0.74 0.827 9.6 9
10 10250 11 0.95 87 0.15 0.976 9.7 9
16 10390 12 1.02 78 0.24 0.984 9.7 9
19 10500 12 1.69 81 0.61 0.984 9.7 9.1
13 10575 12 1.56 81 0.73 0.984 9.7 9.2
16.6 10840 13 1.63 67 0.38 0.932 9.8 9.3
15.9 10960 13 1.83 65 0.57 0.878 9.8 9.4
15.7 11060 13 2.03 69 0.72 0.878 9.8 9.5
14 11475 15 1.69 77 0.2 0.887 10.3 9.5
13.5 11775 18 2.31 58 0.12 0.852 11.8 10.1
6.2 11940 21 2.26 67 0.2 0.976 15.3 12.4
9.6 12070 22 2.07 84 0.08 0.993 15.7 13
15.5 12315 22 3.11 69 0.4 1.185 16.6 14.4
31.4 12900 23 2.82 85 0.42 1.15 16.7 15.9
42.7 12975 24 3.48 77 0.17 1.221 16.7 16.1
38.6 13055 24 3.29 75 0.29 1.161 16.8 16.2
43.4 13250 24 2.82 76 0.43 1.161 16.8 16.2
12.5 13795 25 1.6 81 0.56 0.272 16.8 16.2
21.1 14010 26 1.04 75 0.46 0.201 16.8 16.2
19 14455 28 1.76 64 0.16 0.748 16.9 16.2
18.7 14695 28 2 76 0.27 0.819 17.1 16.2
20.2 14905 29 2.35 75 0.33 0.419 17.2 16.4
27.1 15350 30 2.12 85 0.31 1.29 17 16.5
14.8 15740 30 2.35 78 0.81 0.802 17.3 16.5
12.6 16155 32 2.47 80 0.12 0.67 17.9 16.5
14.9 16325 32 3.76 81 0.5 0.532 17.5 16.6
13.8 17060 34 3.76 65 0.91 0.748 17.6 16.6
9 20265 40 3.41 60 0.01 0.512 17.7 16.6

Which I used to perform a RidgeRegressionWithSGD in Spark 0.9 ... I have
decided to upgrade to Spark 1.0 and only modified the RDD[Array[Double]] to
use Vecotors as explained in the Spark docs. However, leaving all other
parameters the same, I do not get the same model weights and overall
prediction. In fact with Spark 1.0 I get weights of NAN, NAN, NAN, NAN
NAN

Can somebody help? I really need to get it working as it was or at least
explain if there are some other things I need to change in order to get it
working as before. I am writing an entire master thesis based on Spark.

Here is my Spark 1.0 code below and all that was changed was the LabelPoint
made to accept a Vector rather than the Array[Double] as in 0.9.

import scala.math._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.SquaredL2Updater
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD

object TestRegression{

  def main(args: Array[String]) {

if (args.length == 0) {
  println("Please supply data/file")
  System.exit(1)
}
val sc = getSparkContext()
val wData = sc.textFile(args(1)) //read in supplied file

val data = wData.map(_.split(" ").map(_.toDouble)).cache() //read one
line at a time tokenizing on spaces
val trainingSet = data.map {
row => //read one array at a time
  val transformedData = BYModel(row)
  LabeledPoint(transformedData.head,
Vectors.dense(transformedData.tail)) //prepare predictor and output for
regression
  }.cache()

println(data.collect)
println("---")
println(trainingSet.collect.mkString)


val logAlg = new RidgeRegressionWithSGD()
   
logAlg.optimizer.setNumIterations(100).setRegParam(0.1).setStepSize(0.1)//parameters
to optimize gradient descent algorithm
val model = logAlg.run(trainingSet)
println("weights: "+model.weights.toArray.mkString(","))
println("Mean Squared Error: "+calculateMeanSquaredError(model,
trainingSet))
calculateROP(model, data)
sc.stop()
  }


  def getSparkContext(): SparkContext = {
//get spark context
//create spark context config
val conf = new SparkConf().
  setMaster("local").
  setAppName("TestRegression").
  setSparkHome("SPARK_HOME").
  set("spark.executor.memory", "512m").
  set("spark.cleaner.ttl", "3600")
new SparkContext(conf)

  }

  def BYModel(params: Array[Double]): Array[Double] = {
import scala.math.log
val y = log(params(0))
val x2 = 8000 - params(1)
val x3 = pow(params(1), 0.69) * (params(8) - 9)
val x4 = params(1) * (params(8) - params(7))
val x5 = log((params(3) - 0.02) / (4 - 0.02))
val x6 = log(params(4) / 60)
val x7 = params(5) * (-1)
val x8 = params(6)
Array(y, x2, x3, x4, x5, x6, x7, x8)

  }

  def calculateROP(model: GeneralizedLinearModel, data: RDD[Array[Double]])
{
import scala.math.log

val constants = model.weights.toArray 
val a1 = model.intercept
val a2 = constants(0)
val a3 = constants(1)
val a4 = constants(2)
val a5 = constants(3)
val a6 = constants(4)
val a7 = constants(5)
val a8 = constants(6)

val rop = data.map {
  row =>

exp(a1 + (a2 * (8000 - row(1))) 
  + (a3 * (pow(row(1), 0.69) * (row(8) - 9))) 
  + (a4 * (row(1) * (row(8) - row(7 
  + a5 * log((row(3) - 0.02) / (4 - 0.02)) 
  + a6 * log(row(4) / 60) 
  + a7 * (row(5) * (-1

Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-24 Thread Andrew Or
Hi all,

The short answer is that standalone-cluster mode through spark-submit is
broken (and in fact not officially supported). Please use standalone-client
mode instead.
The long answer is provided here:
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3cCAMJOb8m6gF9B3W=p12hi88mexkoon15-1jkvs8pblmuwh9r...@mail.gmail.com%3e

Andrew


2014-06-19 12:00 GMT-07:00 lannyripple :

> Gino,
>
> I can confirm that your solution of assembling with spark-streaming-kafka
> but excluding spark-core and spark-streaming has me working with basic
> spark-submit.  As mentioned you must specify the assembly jar in the
> SparkConfig as well as to spark-submit.
>
> When I see the error you are now experiencing I just restart my cluster
> (sbin/stop-all.sh; sleep 6; sbin/start-all.sh).  My thought is a resource
> leak somewhere but I haven't tried to chase it down since restarting is
> nice
> and quick.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356p7941.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Centralized Spark Logging solution

2014-06-24 Thread Robert James
We need a centralized spark logging solution.  Ideally, it should:

* Allow any Spark process to log at multiple levels (info, warn,
debug) using a single line, similar to log4j
* All logs should go to a central location - so, to read the logs, we
don't need to check each worker by itself
* Ideally, it should be configurable so that when the code is run
standalone (not on spark) to use a different (local) log (this last
point is point is optional, because we could add it with a wrapper)

Can you recommend something? How do you handle logging and debugging
spark applications? Do you go through the logs on each machine?


Re: problem about cluster mode of spark 1.0.0

2014-06-24 Thread Andrew Or
Hi Randy and Gino,

The issue is that standalone-cluster mode is not officially supported.
Please use standalone-client mode instead, i.e. specify --deploy-mode
client in spark-submit, or simply leave out this config because it defaults
to client mode.

Unfortunately, this is not currently documented anywhere, and the existing
explanation for the distinction between cluster and client modes is highly
misleading. In general, cluster mode means the driver runs on one of the
worker nodes, just like the executors. The corollary is that the output of
the application is not forwarded to command that launched the application
(spark-submit in this case), but is accessible instead through the worker
logs. In contrast, client mode means the command that launches the
application also launches the driver, while the executors still run on the
worker nodes. This means the spark-submit command also returns the output
of the application. For instance, it doesn't make sense to run the Spark
shell in cluster mode, because the stdin / stdout / stderr will not be
redirected to the spark-submit command.

If you are hosting your own cluster and can launch applications from within
the cluster, then there is little benefit for launching your application in
cluster mode, which is primarily intended to cut down the latency between
the driver and the executors in the first place. However, if you are still
intent on using standalone-cluster mode after all, you can use the
deprecated way of launching org.apache.spark.deploy.Client directly through
bin/spark-class. Note that this is not recommended and only serves as a
temporary workaround until we fix standalone-cluster mode through
spark-submit.

I have filed the relevant issues:
https://issues.apache.org/jira/browse/SPARK-2259 and
https://issues.apache.org/jira/browse/SPARK-2260. Thanks for pointing this
out, and we will get to fixing these shortly.

Best,
Andrew


2014-06-20 6:06 GMT-07:00 Gino Bustelo :

> I've found that the jar will be copied to the worker from hdfs fine, but
> it is not added to the spark context for you. You have to know that the jar
> will end up in the driver's working dir, and so you just add a the file
> name if the jar to the context in your program.
>
> In your example below, just add to the context "test.jar".
>
> Btw, the context will not have the master URL either, so add that while
> you are at it.
>
> This is a big issue. I've posted about it a week ago and no replies.
> Hopefully it gets more attention as more people start hitting this.
> Basically, spark-submit on standalone cluster with cluster deploy is broken.
>
> Gino B.
>
> > On Jun 20, 2014, at 2:46 AM, randylu  wrote:
> >
> > in addition, jar file can be copied to driver node automatically.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-cluster-mode-of-spark-1-0-0-tp7982p7984.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Setting user permissions for Spark and Shark

2014-06-24 Thread ajatix
Hi

I am currently running a private mesos cluster of 1+3 machines for running
Spark and Shark applications on it. I've currently installed everything from
an admin account. I now want to run them from another account restricting
access to the configuration settings. Any suggestions on how to go about it?

I am able to run Spark in standalone mode but not in cluster mode when
accessed from a test account. Also, I only have read permissions with HDFS. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-user-permissions-for-Spark-and-Shark-tp8180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Integrate Spark Editor with Hue for source compiled installation of spark/spark-jobServer

2014-06-24 Thread Sunita Arvind
Hello Experts,

I am attempting to integrate Spark Editor with Hue on CDH5.0.1. I have the
spark installation build manually from the sources for spark1.0.0. I am
able to integrate this with cloudera manager.

Background:
---
We have a 3 node VM cluster with CDH5.0.1
We requried spark1.0.0 due to some features in it, so I did a

 "yum remove spark-core spark-master spark-worker spark-python"

 of the default spark0.9.0 and compiled spark1.0.0 from source:

Downloaded the spark-trunk from

git clone https://github.com/apache/spark.git
cd spark
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly

The spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar was built and spark by
itself seems to work well. I was even able to run a text file count.

Current attempt:

Referring to this article - http://gethue.com/a-new-spark-web-ui-spark-app/
Now I am trying to add the Spark editor to Hue. AFAIK, this requires
git clone https://github.com/ooyala/spark-jobserver.git
cd spark-jobserver
sbt
re-start

This was successful after lot of struggle with the proxy settings. However,
is this the job Server itself? Will that mean the job Server has to be
manually started. I intend to have the spark editor show up in hue web UI
and I am no way close. Can some one please help?

Note, the 3 VMs are Linux CentOS. Not sure if setting something like can be
expected to work.:

[desktop]
app_blacklist=


Also, I have made the changes to vim .
/job-server/src/main/resources/application.conf as recommended, however, I
do not expect this to impact hue in any way.

Also, I intend to let the editor stay available, not spawn it everytime it
is required.


Thanks in advance.

regards


Re: How to use K-fold validation in spark-1.0?

2014-06-24 Thread Evan R. Sparks
There is a method in org.apache.spark.mllib.util.MLUtils called "kFold"
which will automatically partition your dataset for you into k train/test
splits at which point you can build k different models and aggregate the
results.

For example (a very rough sketch - assuming I want to do 10-fold cross
validation on a binary classification model on a file with 1000 features in
it):

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.LabelParsers
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD


val dat = MLUtils.loadLibSVMFile(sc, "path/to/data", false, 1000)

val cvdat = kFold(dat, 10, 42)

val modelErrrors = cvdat.map { case (train, test) => {
   val model = LogisticRegressionWithSGD.train(train, 100, 0.1, 1.0)
   val error = computeError(model, test)
(model, error)}}

//Average error:
val avgError = modelErrors.map(_._2).reduce(_ + _) / modelErrors.length

Here, I'm assuming you've got some "computeError" function defined. Note
that many of these APIs are marked "experimental" and thus might change in
a future spark release.


On Tue, Jun 24, 2014 at 6:44 AM, Eustache DIEMERT 
wrote:

> I'm interested in this topic too :)
>
> Are the MLLib core devs on this list ?
>
> E/
>
>
> 2014-06-24 14:19 GMT+02:00 holdingonrobin :
>
> Anyone knows anything about it? Or should I actually move this topic to a
>> MLlib specif mailing list? Any information is appreciated! Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-K-fold-validation-in-spark-1-0-tp8142p8172.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Streaming aggregation

2014-06-24 Thread john levingston
I have a use case where I cannot figure out the spark streaming way to do
it.

Given two kafka topics corresponding to two different types of events A and
B.  For each element  from topic A correspond an element from topic B.
Unfortunately elements can arrive separately by hours.


The aggregation operation is not deterministic so I do not have a common
key but instead a list of rules which will select the best candidates
already arrived (most commonly one or two elements). The other candidates
should not be discarded as they have also a corresponding elements. Once
the two elements are aggregated they will be published to a kafka topic.

Is the spark streaming way to do it, a group by key over all the RDD and
then applying our aggregation function?

How would I remove the aggregate elements with a tag filtering?  will that
be costly?

Same question how would I remove candidates older than two days?

If a worker fails for a short time and then come back on line what would
happen?

Thank you.


Re: Using Spark as web app backend

2014-06-24 Thread Koert Kuipers
run your spark app in client mode together with a spray rest service, that
the front end can talk to


On Tue, Jun 24, 2014 at 3:12 AM, Jaonary Rabarisoa 
wrote:

> Hi all,
>
> So far, I run my spark jobs with spark-shell or spark-submit command. I'd
> like to go further and I wonder how to use spark as a backend of a web
> application. Specificaly, I want a frontend application ( build with nodejs
> )  to communicate with spark on the backend, so that every query from the
> frontend is rooted to spark. And the result from Spark are sent back to the
> frontend.
> Does some of you already experiment this kind of architecture ?
>
>
> Cheers,
>
>
> Jaonary
>


Re: How to use K-fold validation in spark-1.0?

2014-06-24 Thread Eustache DIEMERT
I'm interested in this topic too :)

Are the MLLib core devs on this list ?

E/


2014-06-24 14:19 GMT+02:00 holdingonrobin :

> Anyone knows anything about it? Or should I actually move this topic to a
> MLlib specif mailing list? Any information is appreciated! Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-K-fold-validation-in-spark-1-0-tp8142p8172.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: How to use K-fold validation in spark-1.0?

2014-06-24 Thread holdingonrobin
Anyone knows anything about it? Or should I actually move this topic to a
MLlib specif mailing list? Any information is appreciated! Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-K-fold-validation-in-spark-1-0-tp8142p8172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: broadcast not working in yarn-cluster mode

2014-06-24 Thread Christophe Préaud
Hi again,

I've finally solved the problem below, it was due to an old 1.0.0-rc3 spark jar 
lying around in my .m2 directory which was used when I compiled  my spark 
applications (with maven).

Christophe.

On 20/06/2014 18:13, Christophe Préaud wrote:
> Hi,
>
> Since I migrated to spark 1.0.0, a couple of applications that used to work 
> in 0.9.1 now fail when broadcasting a variable.
> Those applications are run on a YARN cluster in yarn-cluster mode (and used 
> to run in yarn-standalone mode in 0.9.1)
>
> Here is an extract of the error log:
>
> Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:186)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.spark.SparkContext.broadcast(Ljava/lang/Object;)Lorg/apache/spark/broadcast/Broadcast;
> at 
> kelkoo.MerchantOffersPerformance$.main(MerchantOffersPerformance.scala:289)
> at 
> kelkoo.MerchantOffersPerformance.main(MerchantOffersPerformance.scala)
>
> Has anyone any idea how to solve this problem?
>
> Thanks,
> Christophe.
>
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 8, rue du Sentier 75002 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à l'attention 
> exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
> message, merci de le détruire et d'en avertir l'expéditeur.


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re: Prediction using Classification with text attributes in Apache Spark MLLib

2014-06-24 Thread Sean Owen
On Tue, Jun 24, 2014 at 12:28 PM, Ulanov, Alexander
 wrote:
> You need to convert your text to vector space model: 
> http://en.wikipedia.org/wiki/Vector_space_model
> and then pass it to SVM. As far as I know, in previous versions of MLlib 
> there was a special class for doing this: 
> https://github.com/amplab/MLI/blob/master/src/main/scala/feat/NGrams.scala. 
> It is not compatible with Spark 1.0.
> I wonder why MLLib folks didn't include it in newer versions of Spark.

(PS that is a class from MLI, not MLlib)


RE: Prediction using Classification with text attributes in Apache Spark MLLib

2014-06-24 Thread Ulanov, Alexander
Hi Imk,

There is a number of libraries and scripts to convert text to libsvm format, if 
you just type " libsvm format converter" in search engine. Unfortunately I 
cannot recommend a specific one, except the one that is built in Weka. I use it 
for test purposes, and for big experiments it is easier to write your own 
converter. Format is simple enough. However, I hope that such tool will be 
implemented in Spark MLLib someday, because it will benefit from parallel 
processing.

Best regards, Alexander

-Original Message-
From: lmk [mailto:lakshmi.muralikrish...@gmail.com] 
Sent: Tuesday, June 24, 2014 3:41 PM
To: u...@spark.incubator.apache.org
Subject: RE: Prediction using Classification with text attributes in Apache 
Spark MLLib

Hi Alexander,
Thanks for your prompt response. Earlier I was executing this Prediction using 
Weka only. But now we are moving to a huge dataset and hence to Apache Spark 
MLLib. Is there any other way to convert to libSVM format? Or is there any 
other simpler algorithm that I can use in mllib?

Thanks,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Prediction-using-Classification-with-text-attributes-in-Apache-Spark-MLLib-tp8166p8168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Prediction using Classification with text attributes in Apache Spark MLLib

2014-06-24 Thread lmk
Hi Alexander,
Thanks for your prompt response. Earlier I was executing this Prediction
using Weka only. But now we are moving to a huge dataset and hence to Apache
Spark MLLib. Is there any other way to convert to libSVM format? Or is there
any other simpler algorithm that I can use in mllib?

Thanks,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Prediction-using-Classification-with-text-attributes-in-Apache-Spark-MLLib-tp8166p8168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Prediction using Classification with text attributes in Apache Spark MLLib

2014-06-24 Thread Ulanov, Alexander
Hi,

You need to convert your text to vector space model: 
http://en.wikipedia.org/wiki/Vector_space_model
and then pass it to SVM. As far as I know, in previous versions of MLlib there 
was a special class for doing this: 
https://github.com/amplab/MLI/blob/master/src/main/scala/feat/NGrams.scala. It 
is not compatible with Spark 1.0.
I wonder why MLLib folks didn't include it in newer versions of Spark.

As a workaround, you could use a separate tool to convert your data to LibSVM 
format http://stats.stackexchange.com/questions/61328/libsvm-data-format, and 
then load it with MLUtils.loadLibSVMFile. For example, you could use Weka 
http://www.cs.waikato.ac.nz/ml/weka/  (it has friendly UI but doesn't handle 
big datasets) to convert your file.

Best regards, Alexander

-Original Message-
From: lmk [mailto:lakshmi.muralikrish...@gmail.com] 
Sent: Tuesday, June 24, 2014 3:17 PM
To: u...@spark.incubator.apache.org
Subject: Prediction using Classification with text attributes in Apache Spark 
MLLib

Hi,
I am trying to predict an attribute with binary value (Yes/No) using SVM.
All my attributes which belong to the training set are text attributes. 
I understand that I have to convert my outcome as double (0.0/1.0). But I donot 
understand how to deal with my explanatory variables which are also text.
Please let me know how I can do this.

Thanks.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Prediction-using-Classification-with-text-attributes-in-Apache-Spark-MLLib-tp8166.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Prediction using Classification with text attributes in Apache Spark MLLib

2014-06-24 Thread lmk
Hi,
I am trying to predict an attribute with binary value (Yes/No) using SVM.
All my attributes which belong to the training set are text attributes. 
I understand that I have to convert my outcome as double (0.0/1.0). But I
donot understand how to deal with my explanatory variables which are also
text.
Please let me know how I can do this.

Thanks.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Prediction-using-Classification-with-text-attributes-in-Apache-Spark-MLLib-tp8166.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pyspark-Failed to run first

2014-06-24 Thread angel2014
It's ... kind of weird  if I try to execute this

cotizas = sc.textFile("A_ko")
print cotizas.take(10)

it doesn't work, but if I remove only one "A" character from this file ...
 it's all OK ...

At first I thought it was due to the number of splits or something like
that ... but I downloaded this file

http://www.briandunning.com/sample-data/uk-500.zip

and it also works OK. This file is larger in number of lines (501 lines
over 50 lines) and in size (96KB over 14KB).





2014-06-23 18:28 GMT+02:00 Congrui Yi [via Apache Spark User List] <
ml-node+s1001560n8128...@n3.nabble.com>:

> So it does not work for files on HDFS either? That is really a problem.
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Failed-to-run-first-tp7691p8128.html
>  To unsubscribe from pyspark-Failed to run first, click here
> 
> .
> NAML
> 
>


A_ko (18K) 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Failed-to-run-first-tp7691p8165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Using Spark as web app backend

2014-06-24 Thread Jörn Franke
Hi,

You could use sock.js / websockets on the front end, so you can notify the
user when the job is finished. You can regularly poll the URL of the job to
check its status from.your node.js app  - at the moment I do not know an
out of the box solution.

Nicer would be if your job sends a message via rabbitmq, kafka etc. once it
finishes and your node.js app receives this message and notifies via
websocket/ sock.js  the front end (i.e. the Javascript running in the HTML
page of your users browser). Perhaps you can also then add stomp.js - so
you can make sure the user receives the message in the browser. If I will
find time I will write an example app, because I think it is a relevant use
case...

Best regards,

Jörn
Le 24 juin 2014 09:13, "Jaonary Rabarisoa"  a écrit :

> Hi all,
>
> So far, I run my spark jobs with spark-shell or spark-submit command. I'd
> like to go further and I wonder how to use spark as a backend of a web
> application. Specificaly, I want a frontend application ( build with nodejs
> )  to communicate with spark on the backend, so that every query from the
> frontend is rooted to spark. And the result from Spark are sent back to the
> frontend.
> Does some of you already experiment this kind of architecture ?
>
>
> Cheers,
>
>
> Jaonary
>


Using Spark as web app backend

2014-06-24 Thread Jaonary Rabarisoa
Hi all,

So far, I run my spark jobs with spark-shell or spark-submit command. I'd
like to go further and I wonder how to use spark as a backend of a web
application. Specificaly, I want a frontend application ( build with nodejs
)  to communicate with spark on the backend, so that every query from the
frontend is rooted to spark. And the result from Spark are sent back to the
frontend.
Does some of you already experiment this kind of architecture ?


Cheers,


Jaonary