master die and worker registration failed with duplicated worker id

2015-10-19 Thread ZhuGe
Hi all:We met a serial of weir problem in our standalone cluster with 2 
masters(zk election agent).
Q1 :Firstly, we find the active master would lose leadership at some point and 
shutdown itself.
[INFO 2015-10-17 13:00:15 (ClientCnxn.java:1083)] Client session timed out, 
have not heard from server in 27132ms for sessionid 0x14fef0664190018, closing 
socket connection and attempting reconnect[INFO 2015-10-17 13:00:15 
(ConnectionStateManager.java:194)] State change: SUSPENDED[INFO 2015-10-17 
13:00:15 (Logging.scala:59)] We have lost leadership[ERROR 2015-10-17 13:00:15 
(Logging.scala:75)] Leadership has been revoked -- master shutting down.[INFO 
2015-10-17 13:00:15 (Logging.scala:59)] Shutdown hook called
>From the log, it seems that the zk session timeout. Then zk election agent 
>revoke the master. We suspect a long full gc hang the process. We monitor the 
>gc and find that after 3+ days' working, there is 200+ minor gc and no full 
>gc.  We dump the heap and find that the JobProgressListener consume a log of 
>memory. Is it a bug or some configuration problem? 
123 instances of "org.apache.spark.ui.jobs.JobProgressListener", loaded by 
"sun.misc.Launcher$AppClassLoader @ 0xbffa2420" occupy 338.25 MB (86.70%) 
bytes. 
Biggest instances:
•org.apache.spark.ui.jobs.JobProgressListener @ 0xc5e4de80 - 18.42 MB (4.72%) 
bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xc7298f70 - 18.42 MB 
(4.72%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xc870d960 - 
18.42 MB (4.72%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 
0xc498bc50 - 18.42 MB (4.72%) bytes. 
•org.apache.spark.ui.jobs.JobProgressListener @ 0xc09a69b0 - 18.42 MB (4.72%) 
bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xca84dbc0 - 14.57 MB 
(3.74%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xc2850b10 - 
14.53 MB (3.73%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 
0xd7118058 - 14.50 MB (3.72%) bytes. 
•org.apache.spark.ui.jobs.JobProgressListener @ 0xd02fbba8 - 14.41 MB (3.69%) 
bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xd3b81ef0 - 12.07 MB 
(3.09%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xd90546d0 - 
11.43 MB (2.93%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 
0xd1935b30 - 11.43 MB (2.93%) bytes. 
•org.apache.spark.ui.jobs.JobProgressListener @ 0xd9d50e48 - 6.90 MB (1.77%) 
bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xc3971b90 - 6.90 MB 
(1.77%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xcc78c918 - 
6.90 MB (1.77%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 
0xd27d6ce0 - 6.90 MB (1.77%) bytes. 
•org.apache.spark.ui.jobs.JobProgressListener @ 0xcbdfa0a0 - 6.32 MB (1.62%) 
bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xd6736f90 - 4.52 MB 
(1.16%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xdc8be4a0 - 
4.51 MB (1.16%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 
0xdbca8e78 - 4.43 MB (1.13%) bytes. 
•org.apache.spark.ui.jobs.JobProgressListener @ 0xca0c5d78 - 4.42 MB (1.13%) 
bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xc2101c40 - 4.41 MB 
(1.13%) bytes. •org.apache.spark.ui.jobs.JobProgressListener @ 0xcfa67e38 - 
4.41 MB (1.13%) bytes. These instances are referenced from one instance of 
"org.spark-project.jetty.server.Handler[]", loaded by 
"sun.misc.Launcher$AppClassLoader @ 0xbffa2420"
Keywordsorg.apache.spark.ui.jobs.JobProgressListenerorg.spark-project.jetty.server.Handler[]sun.misc.Launcher$AppClassLoader
 @ 0xbffa2420



Q2:When the active master die and backup master get the leadership, there 
another problem pop up. some woker would failed to register in the new master. 
As consequence, the new master would lose some workers. Log showed below:

[INFO 2015-10-19 15:01:45 (Logging.scala:59)] Master has changed, new master is 
at spark://10.12.201.163:7071[INFO 2015-10-19 15:02:57 (Logging.scala:59)] 
Master with url spark://10.12.201.162:7071 requested this worker to 
reconnect.[INFO 2015-10-19 15:02:57 (Logging.scala:59)] Connecting to master 
akka.tcp://sparkMaster@10.12.201.162:7071/user/Master...[INFO 2015-10-19 
15:02:57 (Logging.scala:59)] Master with url spark://10.12.201.162:7071 
requested this worker to reconnect.[INFO 2015-10-19 15:02:57 
(Logging.scala:59)] Not spawning another attempt to register with the master, 
since there is an attempt scheduled already.[INFO 2015-10-19 15:02:57 
(Logging.scala:59)] Master with url spark://10.12.201.162:7071 requested this 
worker to reconnect.[INFO 2015-10-19 15:02:57 (Logging.scala:59)] Not spawning 
another attempt to register with the master, since there is an attempt 
scheduled already.[WARN 2015-10-19 15:02:57 (Slf4jLogger.scala:71)] Association 
with remote system [akka.tcp://sparkMaster@10.12.201.162:7071] has failed, 
address is now gated for [5000] ms. Reason is: [Disassociated].[INFO 2015-10-19 
15:03:01 (Logging.scala:59)] Asked to launch executor app-20151019150302-/8 
for 

Re: Spark handling parallel requests

2015-10-19 Thread Akhil Das
Yes, there are kafka consumers/producers for almost all the languages, you
can read more over here
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-PHP
Here's a repo for the php version https://github.com/EVODelavega/phpkafka

Thanks
Best Regards

On Sun, Oct 18, 2015 at 12:58 PM,  wrote:

> hi Akhlis
>
> its a must to push data to a socket as i am using php as a web service to
> push data to socket , then spark catch the data on that socket and process
> it , is there a way to push data from php to kafka directly ?
>
> --  Best Regards, -- Tarek Abouzeid
>
>
>
> On Sunday, October 18, 2015 10:26 AM, "tarek.abouzei...@yahoo.com" <
> tarek.abouzei...@yahoo.com> wrote:
>
>
> hi Xiao,
> 1- requests are not similar at all , but they use solr and do commit
> sometimes
> 2- no caching is required
> 3- the throughput must be very high yeah , the requests are tiny but the
> system may receive 100 request/sec ,
> does kafka support listening to a socket ?
>
> --  Best Regards, -- Tarek Abouzeid
>
>
>
> On Monday, October 12, 2015 10:50 AM, Xiao Li 
> wrote:
>
>
> Hi, Tarek,
>
> It is hard to answer your question. Are these requests similar? Caching
> your results or intermediate results in your applications? Or does that
> mean your throughput requirement is very high? Throttling the number of
> concurrent requests? ...
>
> As Akhil said, Kafka might help in your case. Otherwise, you need to read
> the designs or even source codes of Kafka and Spark Streaming.
>
>  Best wishes,
>
> Xiao Li
>
>
> 2015-10-11 23:19 GMT-07:00 Akhil Das :
>
> Instead of pushing your requests to the socket, why don't you push them to
> a Kafka or any other message queue and use spark streaming to process them?
>
> Thanks
> Best Regards
>
> On Mon, Oct 5, 2015 at 6:46 PM, 
> wrote:
>
> Hi ,
> i am using Scala , doing a socket program to catch multiple requests at
> same time and then call a function which uses spark to handle each process
> , i have a multi-threaded server to handle the multiple requests and pass
> each to spark , but there's a bottleneck as the spark doesn't initialize a
> sub task for the new request , is it even possible to do parallel
> processing using single spark job ?
> Best Regards,
>
> --  Best Regards, -- Tarek Abouzeid
>
>
>
>
>
>
>
>


Re: In-memory computing and cache() in Spark

2015-10-19 Thread Igor Berman
Does ur iterations really submit job? I dont see any action there
On Oct 17, 2015 00:03, "Jia Zhan"  wrote:

> Hi all,
>
> I am running Spark locally in one node and trying to sweep the memory size
> for performance tuning. The machine has 8 CPUs and 16G main memory, the
> dataset in my local disk is about 10GB. I have several quick questions and
> appreciate any comments.
>
> 1. Spark performs in-memory computing, but without using RDD.cache(), will
> anything be cached in memory at all? My guess is that, without RDD.cache(),
> only a small amount of data will be stored in OS buffer cache, and every
> iteration of computation will still need to fetch most data from disk every
> time, is that right?
>
> 2. To evaluate how caching helps with iterative computation, I wrote a
> simple program as shown below, which basically consists of one saveAsText()
> and three reduce() actions/stages. I specify "spark.driver.memory" to
> "15g", others by default. Then I run three experiments.
>
> *   val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>
>*val* *sc* = *new* *SparkContext*(conf)
>
>*val* *input* = sc.textFile(*"/InputFiles"*)
>
>   *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>
>   *val* *ITERATIONS* = *3*
>
>   *for* (i *<-* *1* to *ITERATIONS*) {
>
>   *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
> )).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>
>   }
>
> (I) The first run: no caching at all. The application finishes in ~12
> minutes (2.6min+3.3min+3.2min+3.3min)
>
> (II) The second run, I modified the code so that the input will be cached:
>  *val input = sc.textFile("/InputFiles").cache()*
>  The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>  The storage page in Web UI shows 48% of the dataset  is cached, which
> makes sense due to large java object overhead, and
> spark.storage.memoryFraction is 0.6 by default.
>
> (III) However, the third run, same program as the second one, but I
> changed "spark.driver.memory" to be "2g".
>The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
> And UI shows 6% of the data is cached.
>
> *From the results we can see the reduce stages finish in seconds, how
> could that happen with only 6% cached? Can anyone explain?*
>
> I am new to Spark and would appreciate any help on this. Thanks!
>
> Jia
>
>
>
>


best way to generate per key auto increment numerals after sorting

2015-10-19 Thread fahad shah
Hi

I wanted to ask whats the best way to achieve per key auto increment
numerals after sorting, for eg. :

raw file:

1,a,b,c,1,1
1,a,b,d,0,0
1,a,b,e,1,0
2,a,e,c,0,0
2,a,f,d,1,0

post-output (the last column is the position number after grouping on
first three fields and reverse sorting on last two values)

1,a,b,c,1,1,1
1,a,b,d,0,0,3
1,a,b,e,1,0,2
2,a,e,c,0,0,2
2,a,f,d,1,0,1

I am using solution that uses groupbykey but that is running into some
issues (possibly bug with pyspark/spark?), wondering if there is a
better way to achieve this.

My solution:

A = A = sc.textFile("train.csv").filter(lambda x:not
isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
None)

B = A.map(lambda k:
((k.first_field,k.second_field,k.first_field,k.third_field),
(k[0:5]))).groupByKey()

B.map(sort_n_set_position).flatMap(lambda line: line)

where sort and set position iterates over the iterator and performs
sorting and adding last column.

best fahad

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread Jeff Zhang
Stacktrace would be helpful if you can provide that.



On Mon, Oct 19, 2015 at 1:42 PM, fahad shah  wrote:

>  Hi
>
> I am trying to do pair rdd's, group by the key assign id based on key.
> I am using Pyspark with spark 1.3, for some reason, I am getting this
> error that I am unable to figure out - any help much appreciated.
>
> Things I tried (but to no effect),
>
> 1. make sure I am not doing any conversions on the strings
> 2. make sure that the fields used in the key are all there  and not
> empty string (or else I toss the row out)
>
> My code is along following lines (split is using stringio to parse
> csv, header removes the header row and parse_train is putting the 54
> fields into named tuple after whitespace/quote removal):
>
> #Error for string argument is thrown on the BB.take(1) where the
> groupbykey is evaluated
>
> A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> A.count()
>
> B = A.map(lambda k:
>
> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>  k.srch_children_count,k.srch_room_count),
> (k[0:54])))
> BB = B.groupByKey()
> BB.take(1)
>
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread fahad shah
Thanks Jeff, please find the stack trace below:

Py4JJavaError Traceback (most recent call last)
 in ()
  1 BB = B.groupByKey()
> 2 BB.take(1)
C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\rdd.py in take(self, num)
   1222
   1223 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1224 res = self.context.runJob(self, takeUpToNumLeft, p, True)
   1225
   1226 items += res
C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\context.py in
runJob(self, rdd, partitionFunc, partitions, allowLocal)
840 mappedRDD = rdd.mapPartitions(partitionFunc)
841 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions,
--> 842   allowLocal)
843 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
844
C:\apps\dist\IPython-3.1.0.0.0.0.0-0001\lib\site-packages\py4j\java_gateway.pyc
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
C:\apps\dist\IPython-3.1.0.0.0.0.0-0001\lib\site-packages\py4j\protocol.pyc
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 4 in stage 19.0 failed 4 times, most recent failure: Lost task
4.3 in stage 19.0 (TID 95,
workernode15.expediademocluster.j1.internal.cloudapp.net):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\worker.py",
line 101, in main
process()
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\worker.py",
line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py",
line 125, in dump_stream
for obj in iterator:
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\rdd.py",
line 1626, in add_shuffle_key
for k, v in iterator:
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\shuffle.py",
line 383, in _external_items
False)
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\shuffle.py",
line 288, in mergeCombiners
for k, v in iterator:
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py",
line 131, in load_stream
yield self._read_with_length(stream)
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py",
line 148, in _read_with_length
length = read_int(stream)
  File "C:\apps\dist\spark-1.3.1.2.2.7.1-0004\python\pyspark\serializers.py",
line 529, in read_int
return struct.unpack("!i", length)[0]
error: unpack requires a string argument of length 4
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
 at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:176)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
 at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at 

spark streaming failing to replicate blocks

2015-10-19 Thread Eugen Cepoi
Hi,

I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
The job is reading data from Kinesis and the batch size is of 30s (I used
the same value for the kinesis checkpointing).
In the executor logs I can see every 5 seconds a sequence of stacktraces
indicating that the block replication failed. I am using the default
storage level MEMORY_AND_DISK_SER_2.
WAL is not enabled nor checkpointing (the checkpoint dir is configured for
the spark context but not for the streaming context).

Here is an example of those logs for ip-10-63-160-18. They occur in every
executor while trying to replicate to any other executor.


15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to
[ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing
connection to ip-10-63-160-18.ec2.internal/10.63.160.18:50929
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending message.
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying
ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection
error on connection to
ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate
input-1-144524231 to BlockManagerId(3,
ip-10-159-151-22.ec2.internal, 50929), failure #0
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 INFO nio.ConnectionManager: Removing
SendingConnection to
ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to
[ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing
connection to ip-10-63-160-18.ec2.internal/10.63.160.18:39506
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending message.
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/19 03:11:55 INFO 

Spark executor on Mesos - how to set effective user id?

2015-10-19 Thread Eugene Chepurniy
Hi everyone!
While we are trying to utilize Spark On Mesos cluster, we are facing an
issue related to effective linux user id being used to start executors on
Mesos slaves: all executors are trying to use driver's linux user id to
start on Mesos slaves. 
Let me explain in detail: spark driver program (which is going to spawn
Spark on Mesos in coarse mode) is started as unprivileged linux user, for
example 'user1'. We have Spark distribution unpacked and ready-to-use on
every mesos slave (placed at /opt/spark, 'spark.mesos.executor.home' is
pointing to this folder). And after attempt to run every executor fails to
start with error log telling user 'user1' is not available. And it is really
true - there is no 'user1' present on Mesos slaves. 
So my question is: how can I control effective user id which will be used
for start executors on Mesos?
Actually I was trying to setup SPARK_USER=nobody on every slave but it
wasn't useful. 
Thanks for advice if any.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-on-Mesos-how-to-set-effective-user-id-tp25118.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is one batch created by Streaming Context always equal to one RDD?

2015-10-19 Thread vaibhavrtk




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-one-batch-created-by-Streaming-Context-always-equal-to-one-RDD-tp25117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to take user jars precedence over Spark jars

2015-10-19 Thread YiZhi Liu
I'm trying to read a Thrift object from SequenceFile, using
elephant-bird's ThriftWritable. My code looks like

val rawData = sc.sequenceFile[BooleanWritable,
ThriftWritable[TrainingSample]](input)
val samples = rawData.map { case (key, value) => {
  value.setConverter(classOf[TrainingSample])
  val conversion = if (key.get) 1 else 0
  val sample = value.get
  (conversion, sample)
}}

When I spark-submit in local mode, it failed with

(Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost):
java.lang.AbstractMethodError:
org.apache.thrift.TUnion.standardSchemeReadValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
... ...

I'm pretty sure it is caused by the conflict of libthrift, I use
thrift-0.6.1 while spark uses 0.9.2, which requires TUnion object to
implement the abstract 'standardSchemeReadValue' method.

But when I set spark.files.userClassPathFirst=true, it failed even earlier:

(Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times,
most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost):
java.lang.ClassCastException: cannot assign instance of scala.None$ to
field org.apache.spark.scheduler.Task.metrics of type scala.Option in
instance of org.apache.spark.scheduler.ResultTask
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

It seems I introduced more conflict, but I couldn't figure out which
one caused this failure.

Interestingly, when I ran mvn test in my project, which test spark job
in locally mode, all worked fine.

So what is the right way to take user jars precedence over Spark jars?

-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming - use the data in different jobs

2015-10-19 Thread Ewan Leith
Storing the data in HBase, Cassandra, or similar is possibly the right answer, 
the other option that can work well is re-publishing the data back into second 
queue on RabbitMQ, to be read again by the next job.

Thanks,
Ewan

From: Oded Maimon [mailto:o...@scene53.com]
Sent: 18 October 2015 12:49
To: user 
Subject: Spark Streaming - use the data in different jobs

Hi,
we've build a spark streaming process that get data from a pub/sub (rabbitmq in 
our case).

now we want the streamed data to be used in different spark jobs (also in 
realtime if possible)

what options do we have for doing that ?


  *   can the streaming process and different spark jobs share/access the same 
RDD's?
  *   can the streaming process create a sparkSQL table and other jobs read/use 
it?
  *   can a spark streaming process trigger other spark jobs and send the the 
data (in memory)?
  *   can a spark streaming process cache the data in memory and other 
scheduled jobs access same rdd's?
  *   should we keep the data to hbase and read it from other jobs?
  *   other ways?

I believe that the answer will be using external db/storage..  hoping to have a 
different solution :)

Thanks.


Regards,
Oded Maimon
Scene53.


This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. 
Please note that any disclosure, copying or distribution of the content of this 
information is strictly forbidden. If you have received this email message in 
error, please destroy it immediately and notify its sender.


Re: Incrementally add/remove vertices in GraphX

2015-10-19 Thread mas
Dear All,

Any update regarding Graph Streaming, I want to update, i.e., add vertices
and edges after creation of graph. 

Any suggestions or recommendations to do that.

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p25116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Should I convert json into parquet?

2015-10-19 Thread Ewan Leith
As Jörn says, Parquet and ORC will get you really good compression and can be 
much faster. There also some nice additions around predicate pushdown which can 
be great if you've got wide tables.

Parquet is obviously easier to use, since it's bundled into Spark. Using ORC is 
described here 
http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/

Thanks,
Ewan

-Original Message-
From: Jörn Franke [mailto:jornfra...@gmail.com] 
Sent: 19 October 2015 06:32
To: Gavin Yue 
Cc: user 
Subject: Re: Should I convert json into parquet?



Good Formats are Parquet or ORC. Both can be useful with compression, such as 
Snappy.   They are much faster than JSON. however, the table structure is up to 
you and depends on your use case.

> On 17 Oct 2015, at 23:07, Gavin Yue  wrote:
> 
> I have json files which contains timestamped events.  Each event associate 
> with a user id. 
> 
> Now I want to group by user id. So converts from
> 
> Event1 -> UserIDA;
> Event2 -> UserIDA;
> Event3 -> UserIDB;
> 
> To intermediate storage. 
> UserIDA -> (Event1, Event2...)
> UserIDB-> (Event3...)
> 
> Then I will label positives and featurize the Events Vector in many different 
> ways, fit each of them into the Logistic Regression. 
> 
> I want to save intermediate storage permanently since it will be used many 
> times.  And there will new events coming every day. So I need to update this 
> intermediate storage every day. 
> 
> Right now I store intermediate storage using Json files.  Should I use 
> Parquet instead?  Or is there better solutions for this use case?
> 
> Thanks a lot !
> 
> 
> 
> 
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Should I convert json into parquet?

2015-10-19 Thread Adrian Tanase
For general data access of the pre-computed aggregates (group by) you’re better 
off with Parquet. I’d only choose JSON if I needed interop with another app 
stack / language that has difficulty accessing parquet (E.g. Bulk load into 
document db…).

On a strategic level, both JSON and parquet are similar since neither give you 
good random access, so you can’t simply “update specific user Ids on new data 
coming in”. Your strategy will probably be to re-process all the users by 
loading new data and current aggregates, joining and writing a new version of 
the aggregates…

If you’re worried about update performance then you probably need to look at a 
DB that offers random write access (Cassandra, Hbase..)

-adrian




On 10/19/15, 12:31 PM, "Ewan Leith"  wrote:

>As Jörn says, Parquet and ORC will get you really good compression and can be 
>much faster. There also some nice additions around predicate pushdown which 
>can be great if you've got wide tables.
>
>Parquet is obviously easier to use, since it's bundled into Spark. Using ORC 
>is described here 
>http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/
>
>Thanks,
>Ewan
>
>-Original Message-
>From: Jörn Franke [mailto:jornfra...@gmail.com] 
>Sent: 19 October 2015 06:32
>To: Gavin Yue 
>Cc: user 
>Subject: Re: Should I convert json into parquet?
>
>
>
>Good Formats are Parquet or ORC. Both can be useful with compression, such as 
>Snappy.   They are much faster than JSON. however, the table structure is up 
>to you and depends on your use case.
>
>> On 17 Oct 2015, at 23:07, Gavin Yue  wrote:
>> 
>> I have json files which contains timestamped events.  Each event associate 
>> with a user id. 
>> 
>> Now I want to group by user id. So converts from
>> 
>> Event1 -> UserIDA;
>> Event2 -> UserIDA;
>> Event3 -> UserIDB;
>> 
>> To intermediate storage. 
>> UserIDA -> (Event1, Event2...)
>> UserIDB-> (Event3...)
>> 
>> Then I will label positives and featurize the Events Vector in many 
>> different ways, fit each of them into the Logistic Regression. 
>> 
>> I want to save intermediate storage permanently since it will be used many 
>> times.  And there will new events coming every day. So I need to update this 
>> intermediate storage every day. 
>> 
>> Right now I store intermediate storage using Json files.  Should I use 
>> Parquet instead?  Or is there better solutions for this use case?
>> 
>> Thanks a lot !
>> 
>> 
>> 
>> 
>> 
>> 
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
>commands, e-mail: user-h...@spark.apache.org
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark executor on Mesos - how to set effective user id?

2015-10-19 Thread Jerry Lam
Can you try setting SPARK_USER at the driver? It is used to impersonate users 
at the executor. So if you have user setup for launching spark jobs on the 
executor machines, simply set it to that user name for SPARK_USER. There is 
another configuration that will prevents jobs being launched with a different 
user except the one that is configured. I don't remember the name of it but it 
is in the documentation.


Sent from my iPhone

> On 19 Oct, 2015, at 8:14 am, Eugene Chepurniy  wrote:
> 
> Hi everyone!
> While we are trying to utilize Spark On Mesos cluster, we are facing an
> issue related to effective linux user id being used to start executors on
> Mesos slaves: all executors are trying to use driver's linux user id to
> start on Mesos slaves. 
> Let me explain in detail: spark driver program (which is going to spawn
> Spark on Mesos in coarse mode) is started as unprivileged linux user, for
> example 'user1'. We have Spark distribution unpacked and ready-to-use on
> every mesos slave (placed at /opt/spark, 'spark.mesos.executor.home' is
> pointing to this folder). And after attempt to run every executor fails to
> start with error log telling user 'user1' is not available. And it is really
> true - there is no 'user1' present on Mesos slaves. 
> So my question is: how can I control effective user id which will be used
> for start executors on Mesos?
> Actually I was trying to setup SPARK_USER=nobody on every slave but it
> wasn't useful. 
> Thanks for advice if any.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-on-Mesos-how-to-set-effective-user-id-tp25118.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - use the data in different jobs

2015-10-19 Thread Adrian Tanase
+1 for re-publishing to pubsub if there is only transient value in the data. If 
you need to query the intermediate representation then you will need to use a 
database.

Sharing RDDs in memory should be possible with projects like spark job server 
but I think that’s overkill in this scenario.

Lastly, if there is no strong requirement to have different jobs, you might 
consider collapsing the 2 jobs into one.. And simply have multiple stages that 
execute in the same job.

-adrian

From: Ewan Leith
Date: Monday, October 19, 2015 at 12:34 PM
To: Oded Maimon, user
Subject: RE: Spark Streaming - use the data in different jobs

Storing the data in HBase, Cassandra, or similar is possibly the right answer, 
the other option that can work well is re-publishing the data back into second 
queue on RabbitMQ, to be read again by the next job.

Thanks,
Ewan

From: Oded Maimon [mailto:o...@scene53.com]
Sent: 18 October 2015 12:49
To: user >
Subject: Spark Streaming - use the data in different jobs

Hi,
we've build a spark streaming process that get data from a pub/sub (rabbitmq in 
our case).

now we want the streamed data to be used in different spark jobs (also in 
realtime if possible)

what options do we have for doing that ?


  *   can the streaming process and different spark jobs share/access the same 
RDD's?
  *   can the streaming process create a sparkSQL table and other jobs read/use 
it?
  *   can a spark streaming process trigger other spark jobs and send the the 
data (in memory)?
  *   can a spark streaming process cache the data in memory and other 
scheduled jobs access same rdd's?
  *   should we keep the data to hbase and read it from other jobs?
  *   other ways?

I believe that the answer will be using external db/storage..  hoping to have a 
different solution :)

Thanks.


Regards,
Oded Maimon
Scene53.


This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. 
Please note that any disclosure, copying or distribution of the content of this 
information is strictly forbidden. If you have received this email message in 
error, please destroy it immediately and notify its sender.


Re: Spark executor on Mesos - how to set effective user id?

2015-10-19 Thread SLiZn Liu
Hi Jerry,

I think you are referring to --no-switch_user. =)





chiling...@gmail.com>于2015年10月19日 周一21:05写道:

> Can you try setting SPARK_USER at the driver? It is used to impersonate
> users at the executor. So if you have user setup for launching spark jobs
> on the executor machines, simply set it to that user name for SPARK_USER.
> There is another configuration that will prevents jobs being launched with
> a different user except the one that is configured. I don't remember the
> name of it but it is in the documentation.
>
>
> Sent from my iPhone
>
> > On 19 Oct, 2015, at 8:14 am, Eugene Chepurniy 
> wrote:
> >
> > Hi everyone!
> > While we are trying to utilize Spark On Mesos cluster, we are facing an
> > issue related to effective linux user id being used to start executors on
> > Mesos slaves: all executors are trying to use driver's linux user id to
> > start on Mesos slaves.
> > Let me explain in detail: spark driver program (which is going to spawn
> > Spark on Mesos in coarse mode) is started as unprivileged linux user, for
> > example 'user1'. We have Spark distribution unpacked and ready-to-use on
> > every mesos slave (placed at /opt/spark, 'spark.mesos.executor.home' is
> > pointing to this folder). And after attempt to run every executor fails
> to
> > start with error log telling user 'user1' is not available. And it is
> really
> > true - there is no 'user1' present on Mesos slaves.
> > So my question is: how can I control effective user id which will be used
> > for start executors on Mesos?
> > Actually I was trying to setup SPARK_USER=nobody on every slave but it
> > wasn't useful.
> > Thanks for advice if any.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-on-Mesos-how-to-set-effective-user-id-tp25118.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How does shuffle work in spark ?

2015-10-19 Thread shahid
@all i did partitionby using default hash partitioner on data
[(1,data)(2,(data),(n,data)]
the total data was approx 3.5 it showed shuffle write 50G and on next action
e.g count it is showing shuffle read of 50 G. i don't understand this
behaviour and i think the performance is getting slow with so much shuffle
read on next tranformation operations.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to take user jars precedence over Spark jars

2015-10-19 Thread YiZhi Liu
Hi Ted,

Unfortunately these two options cause following failure in my environment:

(java.lang.RuntimeException: class
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
org.apache.hadoop.security.GroupMappingServiceProvider,java.lang.RuntimeException:
java.lang.RuntimeException: class
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
org.apache.hadoop.security.GroupMappingServiceProvider)

2015-10-19 22:23 GMT+08:00 Ted Yu :
> Have you tried the following options ?
>
> --conf spark.driver.userClassPathFirst=true --conf
> spark.executor.userClassPathFirst=true
>
> Cheers
>
> On Mon, Oct 19, 2015 at 5:07 AM, YiZhi Liu  wrote:
>>
>> I'm trying to read a Thrift object from SequenceFile, using
>> elephant-bird's ThriftWritable. My code looks like
>>
>> val rawData = sc.sequenceFile[BooleanWritable,
>> ThriftWritable[TrainingSample]](input)
>> val samples = rawData.map { case (key, value) => {
>>   value.setConverter(classOf[TrainingSample])
>>   val conversion = if (key.get) 1 else 0
>>   val sample = value.get
>>   (conversion, sample)
>> }}
>>
>> When I spark-submit in local mode, it failed with
>>
>> (Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times,
>> most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost):
>> java.lang.AbstractMethodError:
>>
>> org.apache.thrift.TUnion.standardSchemeReadValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
>> ... ...
>>
>> I'm pretty sure it is caused by the conflict of libthrift, I use
>> thrift-0.6.1 while spark uses 0.9.2, which requires TUnion object to
>> implement the abstract 'standardSchemeReadValue' method.
>>
>> But when I set spark.files.userClassPathFirst=true, it failed even
>> earlier:
>>
>> (Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times,
>> most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost):
>> java.lang.ClassCastException: cannot assign instance of scala.None$ to
>> field org.apache.spark.scheduler.Task.metrics of type scala.Option in
>> instance of org.apache.spark.scheduler.ResultTask
>> at
>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
>> at
>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> It seems I introduced more conflict, but I couldn't figure out which
>> one caused this failure.
>>
>> Interestingly, when I ran mvn test in my project, which test spark job
>> in locally mode, all worked fine.
>>
>> So what is the right way to take user jars precedence over Spark jars?
>>
>> --
>> Yizhi Liu
>> Senior Software Engineer / Data Mining
>> www.mvad.com, Shanghai, China
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>



-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to take user jars precedence over Spark jars

2015-10-19 Thread Ted Yu
Have you tried the following options ?

--conf spark.driver.userClassPathFirst=true --conf spark.executor.
userClassPathFirst=true

Cheers

On Mon, Oct 19, 2015 at 5:07 AM, YiZhi Liu  wrote:

> I'm trying to read a Thrift object from SequenceFile, using
> elephant-bird's ThriftWritable. My code looks like
>
> val rawData = sc.sequenceFile[BooleanWritable,
> ThriftWritable[TrainingSample]](input)
> val samples = rawData.map { case (key, value) => {
>   value.setConverter(classOf[TrainingSample])
>   val conversion = if (key.get) 1 else 0
>   val sample = value.get
>   (conversion, sample)
> }}
>
> When I spark-submit in local mode, it failed with
>
> (Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times,
> most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost):
> java.lang.AbstractMethodError:
>
> org.apache.thrift.TUnion.standardSchemeReadValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
> ... ...
>
> I'm pretty sure it is caused by the conflict of libthrift, I use
> thrift-0.6.1 while spark uses 0.9.2, which requires TUnion object to
> implement the abstract 'standardSchemeReadValue' method.
>
> But when I set spark.files.userClassPathFirst=true, it failed even earlier:
>
> (Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times,
> most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost):
> java.lang.ClassCastException: cannot assign instance of scala.None$ to
> field org.apache.spark.scheduler.Task.metrics of type scala.Option in
> instance of org.apache.spark.scheduler.ResultTask
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> It seems I introduced more conflict, but I couldn't figure out which
> one caused this failure.
>
> Interestingly, when I ran mvn test in my project, which test spark job
> in locally mode, all worked fine.
>
> So what is the right way to take user jars precedence over Spark jars?
>
> --
> Yizhi Liu
> Senior Software Engineer / Data Mining
> www.mvad.com, Shanghai, China
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL Thriftserver and Hive UDF in Production

2015-10-19 Thread Deenar Toraskar
Reece

You can do the following. Start the spark-shell. Register the UDFs in the
shell using sqlContext, then start the Thrift Server using startWithContext
from the spark shell: https://github.com/apache/spark/blob/master/sql/hive-
thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver
/HiveThriftServer2.scala#L56



Regards
Deenar

On 19 October 2015 at 04:42, Mohammed Guller  wrote:

> Have you tried registering the function using the Beeline client?
>
> Another alternative would be to create a Spark SQL UDF and launch the
> Spark SQL Thrift server programmatically.
>
> Mohammed
>
> -Original Message-
> From: ReeceRobinson [mailto:re...@therobinsons.gen.nz]
> Sent: Sunday, October 18, 2015 8:05 PM
> To: user@spark.apache.org
> Subject: Spark SQL Thriftserver and Hive UDF in Production
>
> Does anyone have some advice on the best way to deploy a Hive UDF for use
> with a Spark SQL Thriftserver where the client is Tableau using Simba ODBC
> Spark SQL driver.
>
> I have seen the hive documentation that provides an example of creating
> the function using a hive client ie: CREATE FUNCTION myfunc AS 'myclass'
> USING JAR 'hdfs:///path/to/jar';
>
> However using Tableau I can't run this create function statement to
> register my UDF. Ideally there is a configuration setting that will load my
> UDF jar and register it at start-up of the thriftserver.
>
> Can anyone tell me what the best option if it is possible?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Thriftserver-and-Hive-UDF-in-Production-tp25114.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Spark MLlib] How to apply spark ml given models for questions with general background

2015-10-19 Thread Zhiliang Zhu
Dear All,
I am new for spark ml.
There is some project for me, for some given math model and I would like to get 
its optimized solution.It is very similar with spark mllib application. 
However, the key problem for me is that the given math model is not obviously 
belonging to the models ( as classification, regression,clustering, 
collaborative filtering, dimensionality reduction ) provided in spark ml...
For some specific application , I think the most important thing is to find the 
proper model for it from the known spark mllib, then all will follow the steps, 
since the optimizer is alreadyunder the mllib.
However, my question  is that, generally how it would go if the specific 
application is exactly belonging to the given models in mllib? Whether it 
generally convenient to split the specificbackground and convert into the given 
model?
What is the general way to apply mllib for some specific backgrounds?
I must appreciate your help very much!
Thank you,Zhiliang


Re: How to take user jars precedence over Spark jars

2015-10-19 Thread Ted Yu
Can you use:
https://maven.apache.org/plugins/maven-shade-plugin/

to shade the dependencies unique to your project ?

On Mon, Oct 19, 2015 at 7:47 AM, YiZhi Liu  wrote:

> Hi Ted,
>
> Unfortunately these two options cause following failure in my environment:
>
> (java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
>
> org.apache.hadoop.security.GroupMappingServiceProvider,java.lang.RuntimeException:
> java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
> org.apache.hadoop.security.GroupMappingServiceProvider)
>
> 2015-10-19 22:23 GMT+08:00 Ted Yu :
> > Have you tried the following options ?
> >
> > --conf spark.driver.userClassPathFirst=true --conf
> > spark.executor.userClassPathFirst=true
> >
> > Cheers
> >
> > On Mon, Oct 19, 2015 at 5:07 AM, YiZhi Liu  wrote:
> >>
> >> I'm trying to read a Thrift object from SequenceFile, using
> >> elephant-bird's ThriftWritable. My code looks like
> >>
> >> val rawData = sc.sequenceFile[BooleanWritable,
> >> ThriftWritable[TrainingSample]](input)
> >> val samples = rawData.map { case (key, value) => {
> >>   value.setConverter(classOf[TrainingSample])
> >>   val conversion = if (key.get) 1 else 0
> >>   val sample = value.get
> >>   (conversion, sample)
> >> }}
> >>
> >> When I spark-submit in local mode, it failed with
> >>
> >> (Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times,
> >> most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost):
> >> java.lang.AbstractMethodError:
> >>
> >>
> org.apache.thrift.TUnion.standardSchemeReadValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
> >> ... ...
> >>
> >> I'm pretty sure it is caused by the conflict of libthrift, I use
> >> thrift-0.6.1 while spark uses 0.9.2, which requires TUnion object to
> >> implement the abstract 'standardSchemeReadValue' method.
> >>
> >> But when I set spark.files.userClassPathFirst=true, it failed even
> >> earlier:
> >>
> >> (Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times,
> >> most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost):
> >> java.lang.ClassCastException: cannot assign instance of scala.None$ to
> >> field org.apache.spark.scheduler.Task.metrics of type scala.Option in
> >> instance of org.apache.spark.scheduler.ResultTask
> >> at
> >>
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
> >> at
> >> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
> >> at
> >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
> >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> >> at
> >>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> >> at
> >>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
> >> at
> >>
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> It seems I introduced more conflict, but I couldn't figure out which
> >> one caused this failure.
> >>
> >> Interestingly, when I ran mvn test in my project, which test spark job
> >> in locally mode, all worked fine.
> >>
> >> So what is the right way to take user jars precedence over Spark jars?
> >>
> >> --
> >> Yizhi Liu
> >> Senior Software Engineer / Data Mining
> >> www.mvad.com, Shanghai, China
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
>
>
> --
> Yizhi Liu
> Senior Software Engineer / Data Mining
> www.mvad.com, Shanghai, China
>


flattening a JSON data structure

2015-10-19 Thread nunomrc
Hi I am fairly new to Spark and I am trying to flatten the following
structure:

 |-- provider: struct (nullable = true)
 ||-- accountId: string (nullable = true)
 ||-- contract: array (nullable = true)

And then provider is:
root
 |-- accountId: string (nullable = true)
 |-- contract: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- details: struct (nullable = true)
 ||||-- contractId: string (nullable = true)
 ||||-- countryCode: string (nullable = true)
 ||||-- endDate: string (nullable = true)
 ||||-- noticePeriod: long (nullable = true)
 ||||-- startDate: string (nullable = true)
 |||-- endDate: string (nullable = true)
 |||-- startDate: string (nullable = true)
 |||-- other: struct (nullable = true)
 ||||-- type: string (nullable = true)
 ||||-- values: array (nullable = true)
 |||||-- element: struct (containsNull = true)
 ||||||-- key: string (nullable = true)
 ||||||-- value: long (nullable = true)


I am trying the following:

dataFrame.map { case Row(., provider: Row, .) =>
   val list = provider.getAs[Array[Row]]("contract")

At this point, I get the following exception:
[info]   org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 4.0 (TID 9, localhost): java.lang.ClassCastException:
scala.collection.mutable.WrappedArray$ofRef cannot be cast to
[Lorg.apache.spark.sql.Row;
[info]  at com.mycode.Deal$$anonfun$flattenDeals$1.apply(Deal.scala:62)

I tried many different variations of this and tried to get the actual data
type of the elements of the array, without any success.
This kind of method to flatten json data structures were working for me with
previous versions of spark, but I am now trying to upgrade from 1.4.1 to
1.5.1 and started getting this error.

What am I doing wrong?
Any help would be appreciated.

Thanks,
Nuno







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-JSON-data-structure-tp25120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



k-prototypes in MLLib?

2015-10-19 Thread Fernando Velasco
Hi everyone!

I am a data scientist new to Spark and I am interested on clustering of
mixed variables. I am more used to R, where there are implementations like
Daysy, PAM, etc. It is true that dummy variables along with K-Means can
perform a nice job on clustering mixed variables, but I find this is not a
completely correct treatment for the categorical ones. So, my question is
if there is any K-modes/k-prototypes implementation planned to be included
in MLlib in the future.

I have been able to find this
https://issues.apache.org/jira/browse/SPARK-4510 but it seems PAM is not
completely scalable. Perhaps K-prototypes could fit better.

Regards,


Re: Spark SQL Thriftserver and Hive UDF in Production

2015-10-19 Thread Todd Nist
>From tableau, you should be able to use the Initial SQL option to support
this:

So in Tableau add the following to the “Initial SQL”

create function myfunc AS 'myclass'
using jar 'hdfs:///path/to/jar';



HTH,
Todd


On Mon, Oct 19, 2015 at 11:22 AM, Deenar Toraskar  wrote:

> Reece
>
> You can do the following. Start the spark-shell. Register the UDFs in the
> shell using sqlContext, then start the Thrift Server using startWithContext
> from the spark shell:
> https://github.com/apache/spark/blob/master/sql/hive-thriftserver
> /src/main/scala/org/apache/spark/sql/hive/thriftserver
> /HiveThriftServer2.scala#L56
>
>
>
> Regards
> Deenar
>
> On 19 October 2015 at 04:42, Mohammed Guller 
> wrote:
>
>> Have you tried registering the function using the Beeline client?
>>
>> Another alternative would be to create a Spark SQL UDF and launch the
>> Spark SQL Thrift server programmatically.
>>
>> Mohammed
>>
>> -Original Message-
>> From: ReeceRobinson [mailto:re...@therobinsons.gen.nz]
>> Sent: Sunday, October 18, 2015 8:05 PM
>> To: user@spark.apache.org
>> Subject: Spark SQL Thriftserver and Hive UDF in Production
>>
>> Does anyone have some advice on the best way to deploy a Hive UDF for use
>> with a Spark SQL Thriftserver where the client is Tableau using Simba ODBC
>> Spark SQL driver.
>>
>> I have seen the hive documentation that provides an example of creating
>> the function using a hive client ie: CREATE FUNCTION myfunc AS 'myclass'
>> USING JAR 'hdfs:///path/to/jar';
>>
>> However using Tableau I can't run this create function statement to
>> register my UDF. Ideally there is a configuration setting that will load my
>> UDF jar and register it at start-up of the thriftserver.
>>
>> Can anyone tell me what the best option if it is possible?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Thriftserver-and-Hive-UDF-in-Production-tp25114.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark handling parallel requests

2015-10-19 Thread Adrian Tanase
To answer your specific question, you can’t push data to Kafka through a socket 
– you need a smart client library as the cluster setup is pretty advanced (also 
requires zookeeper).

I bet there are php libraries for Kafka although after a quick search it seems 
they’re still pretty young. Also – Kafka shines at larger deployments and 
throughput (tens of thousands to millions of events per second) and may be 
overkill for 100 events / sec.

Here are some other ideas:

  *   Use a lighter weight message broker like Rabbit MQ or MQTT – both have 
good integrations with spark and should be simpler to integrate with PHP
  *   Instead of doing a socket call, log the event on disk – this opens up 2 
strategies
 *   If you have access to shared storage, spark could read the files 
directly
 *   Otherwise, you could rely on something like 
Flume that can poll your logs and forward them to 
spark (There is a default integration in the spark external package)
  *   Lastly, why not try to build on one of the custom 
receivers? 
There are plenty code samples in the docs and examples
 *   This may not be a good choice if you can’t afford to lose any messages 
– in this case your life is harder as you’ll need to also use WAL based 
implementation

Hope this helps,
-adrian

From: 
"tarek.abouzei...@yahoo.com.INVALID"
Reply-To: "tarek.abouzei...@yahoo.com"
Date: Sunday, October 18, 2015 at 10:28 AM
To: Xiao Li, Akhil Das
Cc: "user@spark.apache.org"
Subject: Re: Spark handling parallel requests

hi Akhlis

its a must to push data to a socket as i am using php as a web service to push 
data to socket , then spark catch the data on that socket and process it , is 
there a way to push data from php to kafka directly ?

--  Best Regards, -- Tarek Abouzeid



On Sunday, October 18, 2015 10:26 AM, 
"tarek.abouzei...@yahoo.com" 
> wrote:


hi Xiao,
1- requests are not similar at all , but they use solr and do commit sometimes
2- no caching is required
3- the throughput must be very high yeah , the requests are tiny but the system 
may receive 100 request/sec ,
does kafka support listening to a socket ?

--  Best Regards, -- Tarek Abouzeid



On Monday, October 12, 2015 10:50 AM, Xiao Li 
> wrote:


Hi, Tarek,

It is hard to answer your question. Are these requests similar? Caching your 
results or intermediate results in your applications? Or does that mean your 
throughput requirement is very high? Throttling the number of concurrent 
requests? ...

As Akhil said, Kafka might help in your case. Otherwise, you need to read the 
designs or even source codes of Kafka and Spark Streaming.

 Best wishes,

Xiao Li


2015-10-11 23:19 GMT-07:00 Akhil Das 
>:
Instead of pushing your requests to the socket, why don't you push them to a 
Kafka or any other message queue and use spark streaming to process them?

Thanks
Best Regards

On Mon, Oct 5, 2015 at 6:46 PM, 
> 
wrote:
Hi ,
i am using Scala , doing a socket program to catch multiple requests at same 
time and then call a function which uses spark to handle each process , i have 
a multi-threaded server to handle the multiple requests and pass each to spark 
, but there's a bottleneck as the spark doesn't initialize a sub task for the 
new request , is it even possible to do parallel processing using single spark 
job ?
Best Regards,

--  Best Regards, -- Tarek Abouzeid








Re: best way to generate per key auto increment numerals after sorting

2015-10-19 Thread fahad shah
Thanks Davies,

groupbykey was throwing up the error: unpack requires a string
argument of length 4

interestingly, I replace that with the sortbykey (which i read also
shuffles so that data for same key are on same partition) and it ran
fine - wondering if this a bug on groupbykey for Spark 1.3?

best fahad

On Mon, Oct 19, 2015 at 10:45 AM, Davies Liu  wrote:
> What's the issue with groupByKey()?
>
> On Mon, Oct 19, 2015 at 1:11 AM, fahad shah  wrote:
>> Hi
>>
>> I wanted to ask whats the best way to achieve per key auto increment
>> numerals after sorting, for eg. :
>>
>> raw file:
>>
>> 1,a,b,c,1,1
>> 1,a,b,d,0,0
>> 1,a,b,e,1,0
>> 2,a,e,c,0,0
>> 2,a,f,d,1,0
>>
>> post-output (the last column is the position number after grouping on
>> first three fields and reverse sorting on last two values)
>>
>> 1,a,b,c,1,1,1
>> 1,a,b,d,0,0,3
>> 1,a,b,e,1,0,2
>> 2,a,e,c,0,0,2
>> 2,a,f,d,1,0,1
>>
>> I am using solution that uses groupbykey but that is running into some
>> issues (possibly bug with pyspark/spark?), wondering if there is a
>> better way to achieve this.
>>
>> My solution:
>>
>> A = A = sc.textFile("train.csv").filter(lambda x:not
>> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
>> None)
>>
>> B = A.map(lambda k:
>> ((k.first_field,k.second_field,k.first_field,k.third_field),
>> (k[0:5]))).groupByKey()
>>
>> B.map(sort_n_set_position).flatMap(lambda line: line)
>>
>> where sort and set position iterates over the iterator and performs
>> sorting and adding last column.
>>
>> best fahad
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL Exception: Conf non-local session path expected to be non-null

2015-10-19 Thread YaoPau
I've connected Spark SQL to the Hive Metastore and currently I'm running SQL
code via pyspark.  Typically everything works fine, but sometimes after a
long-running Spark SQL job I get the error below, and from then on I can no
longer run Spark SQL commands.  I still do have both my sc and my sqlCtx.

Any idea what this could mean?

An error occurred while calling o36.sql.
: org.apache.spark.sql.AnalysisException: Conf non-local session path
expected to be non-null;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 

Dynamic Allocation & Spark Streaming

2015-10-19 Thread robert towne
I have watched a few videos from Databricks/Andrew Or around the Spark 1.2
release and it seemed that dynamic allocation was not yet available for
Spark Streaming.

I now see SPARK-10955  which
is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic
allocation.

I use Spark Streaming with a receiverless/direct Kafka connection.  When I
start up an app reading from the beginning of the topic I would like to
have more resources than once I have caught up.  Is it possible to use
dynamic allocation for this use case?

thanks,
Robert


Dynamic Allocation & Spark Streaming

2015-10-19 Thread robert towne
I have watched a few videos from Databricks/Andrew Or around the Spark 1.2
release and it seemed that dynamic allocation was not yet available for
Spark Streaming.

I now see SPARK-10955  which
is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic
allocation.

I use Spark Streaming with a receiverless/direct Kafka connection.  When I
start up an app reading from the beginning of the topic I would like to
have more resources than once I have caught up.  Is it possible to use
dynamic allocation for this use case?

thanks,
Robert


Re: writing avro parquet

2015-10-19 Thread Alex Nastetsky
Figured it out ... needed to use saveAsNewAPIHadoopFile, but was trying to
use it on myDF.rdd instead of converting it to a PairRDD first.

On Mon, Oct 19, 2015 at 2:14 PM, Alex Nastetsky <
alex.nastet...@vervemobile.com> wrote:

> Using Spark 1.5.1, Parquet 1.7.0.
>
> I'm trying to write Avro/Parquet files. I have this code:
>
> sc.hadoopConfiguration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS,
> classOf[AvroWriteSupport].getName)
> AvroWriteSupport.setSchema(sc.hadoopConfiguration, MyClass.SCHEMA$)
> myDF.write.parquet(outputPath)
>
> The problem is that the write support class gets overwritten in
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation#prepareJobForWrite:
>
> val writeSupportClass =
> if
> (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
> classOf[MutableRowWriteSupport]
> } else {
> classOf[RowWriteSupport]
> }
> ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
>
> So it doesn't seem to actually write Avro data. When look at the metadata
> of the Parquet files it writes, it looks like this:
>
> extra: org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"foo","type":"string","nullable":true,"metadata":{}},{"name":"bar","type":"long","nullable":true,"metadata":{}}]}
>
> I would expect to see something like "extra:  avro.schema" instead.
>


Spark ML/MLib newbie question

2015-10-19 Thread George Paulson
I have a dataset that's relatively big, but easily fits in memory. I want to
generate many different features for this dataset and then run L1
regularized Logistic Regression on the feature enhanced dataset.

The combined features will easily exhaust memory. I was hoping there was a
way that I could generate the features on the fly for stochastic gradient
descent. That is, every time the SGD routine samples from the original
dataset it will calculate the new features and use those as the input.

With Spark ML it seems like you can do transformations and add those to your
pipeline, which would work if it all fit into memory fairly easily. But, is
it possible to do something like I'm proposing ? A sort of lazy evaluation
within the current library? Or do I need to somehow change
GradientDescent.scala myself for this to work?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ML-MLib-newbie-question-tp25129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark + Streaming + DataFrames

2015-10-19 Thread Tathagata Das
RDD and DF are not compatible data types. So you cannot return a DF when
you have to return an RDD. What rather you can do is return the underlying
RDD of the dataframe by dataframe.rdd().


On Fri, Oct 16, 2015 at 12:07 PM, Jason White 
wrote:

> Hi Ken, thanks for replying.
>
> Unless I'm misunderstanding something, I don't believe that's correct.
> Dstream.transform() accepts a single argument, func. func should be a
> function that accepts a single RDD, and returns a single RDD. That's what
> transform_to_df does, except the RDD it returns is a DF.
>
> I've used Dstream.transform() successfully in the past when transforming
> RDDs, so I don't think my problem is there.
>
> I haven't tried this in Scala yet, and all of the examples I've seen on the
> website seem to use foreach instead of transform. Does this approach work
> in
> Scala?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Concurrency/Multiple Users

2015-10-19 Thread GooniesNeverSayDie
I am trying to connect to an Apache Spark 1.4 server with multiple users.
Here is the issue in short form:

Connection 1 specifies database test1 at connection time. Show tables shows
test1 database tables.

Connection 2 specifies database test2 at connection time. Show tables shows
test2 database tables.

Connection 3 specifies database test3 at connection time. Create database
test4, show tables has empty results as expected. 

Connection 1 and 2 show tables now show empty results.

USE  affects all current connections.

I have investigated this and found that there was a thrift server bug
(SPARK-2087) fixed with 1.4 but this issue still persists with us. Can you
help?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrency-Multiple-Users-tp25130.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread Ted Yu
Interesting.
I wonder why existing tests didn't catch that:

class UnserializableException extends Exception {
./core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
class DAGSchedulerSuiteDummyException extends Exception
./core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
class TestException(msg: String) extends Exception(msg)
./streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala


On Mon, Oct 19, 2015 at 12:02 PM, Lij Tapel  wrote:

> I have verified that there is only 5.1.34 on the classpath.
>
> Funnily enough, I have a repro that doesn't even use mysql so this seems
> to be purely a classloader issue:
>
> source: http://pastebin.com/WMCMwM6T
> 1.4.1: http://pastebin.com/x38DQY2p
> 1.5.1: http://pastebin.com/DQd6k818
>
>
>
> On Mon, Oct 19, 2015 at 11:51 AM, Ted Yu  wrote:
>
>> Lij:
>>
>> jar tvf
>> /Users/tyu/.m2/repository//mysql/mysql-connector-java/5.1.31/mysql-connector-java-5.1.31.jar
>> | grep MySQLSyntaxErrorExceptio
>>914 Wed May 21 01:42:16 PDT 2014
>> com/mysql/jdbc/exceptions/MySQLSyntaxErrorException.class
>>842 Wed May 21 01:42:18 PDT 2014
>> com/mysql/jdbc/exceptions/jdbc4/MySQLSyntaxErrorException.class
>>
>> 5.1.34 has basically the same structure.
>>
>> Can you check if there is other version of mysql-connector-java on the
>> classpath ?
>>
>> Thanks
>>
>> On Mon, Oct 19, 2015 at 11:26 AM, Lij Tapel  wrote:
>>
>>> Sorry, here's the logs and source:
>>>
>>> The error I see in spark 1.5.1: http://pastebin.com/86K9WQ5f
>>> * full logs here: http://pastebin.com/dfysSh9E
>>>
>>> What I used to see in spark 1.4.1: http://pastebin.com/eK3AZQFx
>>> * full logs here: http://pastebin.com/iffSFFWW
>>>
>>> The source and build.sbt: http://pastebin.com/tUvcBerd
>>>
>>> On Mon, Oct 19, 2015 at 11:18 AM, Ted Yu  wrote:
>>>
 The attachments didn't go through.

 Consider pastbebin'ning.

 Thanks

 On Mon, Oct 19, 2015 at 11:15 AM, gbop  wrote:

> I've been struggling with a particularly puzzling issue after
> upgrading to
> Spark 1.5.1 from Spark 1.4.1.
>
> When I use the MySQL JDBC connector and an exception (e.g.
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown
> on the
> executor, I get a ClassNotFoundException on the driver, which results
> in
> this error (logs are abbreviated):
>
>
>
>  In Spark 1.4.1, I get the following (logs are abbreviated):
>
>
>
> I have seriously screwed up somewhere or this is a change in behavior
> that I
> have not been able to find in the documentation. For those that are
> interested, a full repro and logs follow.
>
>
> ---
>
> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in
> various
> combinations of
>  * local/standalone mode
>  * putting mysql on the classpath with --jars/building a fat jar with
> mysql
> in it/manually running sc.addJar on the mysql jar
>  * --deploy-mode client/--deploy-mode cluster
> but nothing seems to change.
>
>
>
> Here is an example invocation, and the accompanying source code:
>
>
>
>
> The source code:
>
>
>
> And the build.sbt:
>
>
>
>
> And here are the results when run against Spark 1.4.1 (build.sbt has
> been
> updated accordingly)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>
>


Re: Concurrency/Multiple Users

2015-10-19 Thread Michael Armbrust
Unfortunately the implementation of SPARK-2087 didn't have enough tests and
got broken in 1.4.  In Spark 1.6 we will have a much more solid fix:
https://github.com/apache/spark/commit/3390b400d04e40f767d8a51f1078fcccb4e64abd

On Mon, Oct 19, 2015 at 2:13 PM, GooniesNeverSayDie 
wrote:

> I am trying to connect to an Apache Spark 1.4 server with multiple users.
> Here is the issue in short form:
>
> Connection 1 specifies database test1 at connection time. Show tables shows
> test1 database tables.
>
> Connection 2 specifies database test2 at connection time. Show tables shows
> test2 database tables.
>
> Connection 3 specifies database test3 at connection time. Create database
> test4, show tables has empty results as expected.
>
> Connection 1 and 2 show tables now show empty results.
>
> USE  affects all current connections.
>
> I have investigated this and found that there was a thrift server bug
> (SPARK-2087) fixed with 1.4 but this issue still persists with us. Can you
> help?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Concurrency-Multiple-Users-tp25130.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to put an object in cache for ever in Streaming

2015-10-19 Thread Tathagata Das
That should also get cleaned through the GC, though you may have to
explicitly run GC periodically for faster clean up.

RDDs are by definition distributed across executors in parts. When caches
the RDD partitions are cached in memory across the executors.

On Fri, Oct 16, 2015 at 6:15 PM, swetha kasireddy  wrote:

> What about cleaning up the tempData that gets generated by shuffles. We
> have a lot of temp data that gets generated by shuffles in /tmp folder.
> That's why we are using ttl. Also if I keep an RDD in cache is it available
> across all the executors or just the same executor?
>
> On Fri, Oct 16, 2015 at 5:49 PM, Tathagata Das 
> wrote:
>
>> Setting a ttl is not recommended any more as Spark works with Java GC to
>> clean up stuff (RDDs, shuffles, broadcasts,etc.) that are not in reference
>> any more.
>>
>> So you can keep an RDD cached in Spark, and every minute uncache the
>> previous one, and cache a new one.
>>
>> TD
>>
>> On Fri, Oct 16, 2015 at 12:02 PM, swetha 
>> wrote:
>>
>>> Hi,
>>>
>>> How to put a changing object in Cache for ever in Streaming. I know that
>>> we
>>> can do rdd.cache but I think .cache would be cleaned up if we set ttl in
>>> Streaming. Our requirement is to have an object in memory. The object
>>> would
>>> be updated every minute  based on the records that we get in our
>>> Streaming
>>> job.
>>>
>>>  Currently I am keeping that in updateStateByKey. But, my
>>> updateStateByKey
>>> is tracking the realtime Session information as well. So, my
>>> updateStateByKey has 4 arguments that tracks session information and
>>> this
>>> object  that tracks the performance info separately. I was thinking it
>>> may
>>> be too much to keep so much data in updateStateByKey.
>>>
>>> Is it recommended to hold a lot of data using updateStateByKey?
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-put-an-object-in-cache-for-ever-in-Streaming-tp25098.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Issue in spark batches

2015-10-19 Thread Tathagata Das
If cassandra is down, does saveToCassandra throw an exception? If it does,
you can catch that exception and write your own logic to retry and/or no
update. Once the foreachRDD function completes, that batch will be
internally marked as completed.

TD

On Mon, Oct 19, 2015 at 5:48 AM, varun sharma 
wrote:

> Hi,
> I am facing this issue consistently in spark-cassandra-kafka *streaming
> job.*
> *Spark 1.4.0*
> *cassandra connector 1.4.0-M3*
> *Issue is:*
>
> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* 
> after
> parsing the json and the subsequently updating the offsets in *zookeeper*.
> If Cassandra cluster is down, it throws exception but the batch which
> arrives in that time window is not processed ever though the offsets are
> updated in zookeeper.
> It is resulting data loss.
> Once the Cassandra cluster is up, this job process the data normally.
> PFA the screenshots of hung batches and code.
>
> *Code:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
> .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
>
>
>   //commit the offsets once everything is done
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
> })
>
> *I have even tried this variant:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
> .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
> })
>
> data_rdd.foreachRDD(rdd=> {
>
>   //commit the offsets once everything is done
>
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>
> }
>
> Exception when cassandra cluster is down:
> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
> streaming job 144523914 ms.3
> java.io.IOException: Failed to open native connection to Cassandra at
> {..}
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Dynamic Allocation & Spark Streaming

2015-10-19 Thread Tathagata Das
Unfortunately the title on the JIRA is extremely confusing. I have fixed it.

The reason why dynamic allocation does not work well with streaming is that
the heuristic that is used to automatically scale up or down the number of
executors works for the pattern of task schedules in batch jobs, not for
streaming jobs. We would definitely solve this in future, may be 1.7.0 or
later.
In the mean time, there are developer API function that allows you add and
remove executors explicitly. See sparkContext.requestExecutors() and
sparkContext.killExecutors(). With this you can write your own scaling
logic. In your case I would do the following.
1. Ask for a large number of executors / cores through spark-submit.
2. Use a StreamingListener to monitor whether it has caught up.
3. Then call killExecutors, to slowly kill a few of them, but make sure
using the listener that the scheduling delay does not go up.

Hope this helps. Let me know if this works for you.

On Mon, Oct 19, 2015 at 1:13 PM, robert towne 
wrote:

> I have watched a few videos from Databricks/Andrew Or around the Spark 1.2
> release and it seemed that dynamic allocation was not yet available for
> Spark Streaming.
>
> I now see SPARK-10955  
> which
> is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic
> allocation.
>
> I use Spark Streaming with a receiverless/direct Kafka connection.  When I
> start up an app reading from the beginning of the topic I would like to
> have more resources than once I have caught up.  Is it possible to use
> dynamic allocation for this use case?
>
> thanks,
> Robert
>


Re: flattening a JSON data structure

2015-10-19 Thread Michael Armbrust
Quickfix is probably to use Seq[Row] instead of Array (the types that are
returned are documented here:
http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types)

Really though you probably want to be using explode.  Perhaps something
like this would help?

import org.apache.spark.sql.functions._
dataFrame.select(explode($"provider.contract")).as("contract"))

On Mon, Oct 19, 2015 at 8:08 AM, nunomrc 
wrote:

> Hi I am fairly new to Spark and I am trying to flatten the following
> structure:
>
>  |-- provider: struct (nullable = true)
>  ||-- accountId: string (nullable = true)
>  ||-- contract: array (nullable = true)
>
> And then provider is:
> root
>  |-- accountId: string (nullable = true)
>  |-- contract: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- details: struct (nullable = true)
>  ||||-- contractId: string (nullable = true)
>  ||||-- countryCode: string (nullable = true)
>  ||||-- endDate: string (nullable = true)
>  ||||-- noticePeriod: long (nullable = true)
>  ||||-- startDate: string (nullable = true)
>  |||-- endDate: string (nullable = true)
>  |||-- startDate: string (nullable = true)
>  |||-- other: struct (nullable = true)
>  ||||-- type: string (nullable = true)
>  ||||-- values: array (nullable = true)
>  |||||-- element: struct (containsNull = true)
>  ||||||-- key: string (nullable = true)
>  ||||||-- value: long (nullable = true)
>
>
> I am trying the following:
>
> dataFrame.map { case Row(., provider: Row, .) =>
>val list = provider.getAs[Array[Row]]("contract")
>
> At this point, I get the following exception:
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 4.0 (TID 9, localhost): java.lang.ClassCastException:
> scala.collection.mutable.WrappedArray$ofRef cannot be cast to
> [Lorg.apache.spark.sql.Row;
> [info]  at com.mycode.Deal$$anonfun$flattenDeals$1.apply(Deal.scala:62)
>
> I tried many different variations of this and tried to get the actual data
> type of the elements of the array, without any success.
> This kind of method to flatten json data structures were working for me
> with
> previous versions of spark, but I am now trying to upgrade from 1.4.1 to
> 1.5.1 and started getting this error.
>
> What am I doing wrong?
> Any help would be appreciated.
>
> Thanks,
> Nuno
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-JSON-data-structure-tp25120.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread Lij Tapel
Wow thanks, that's a great place to start digging deeper. Would it be
appropriate to file this on JIRA? It makes spark 1.5.1 a bit of a deal
breaker for me but I wouldn't mind taking a shot at fixing it given some
guidance

On Mon, Oct 19, 2015 at 1:03 PM, Ted Yu  wrote:

> Interesting.
> I wonder why existing tests didn't catch that:
>
> class UnserializableException extends Exception {
> ./core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
> class DAGSchedulerSuiteDummyException extends Exception
> ./core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
> class TestException(msg: String) extends Exception(msg)
>
> ./streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
>
>
> On Mon, Oct 19, 2015 at 12:02 PM, Lij Tapel  wrote:
>
>> I have verified that there is only 5.1.34 on the classpath.
>>
>> Funnily enough, I have a repro that doesn't even use mysql so this seems
>> to be purely a classloader issue:
>>
>> source: http://pastebin.com/WMCMwM6T
>> 1.4.1: http://pastebin.com/x38DQY2p
>> 1.5.1: http://pastebin.com/DQd6k818
>>
>>
>>
>> On Mon, Oct 19, 2015 at 11:51 AM, Ted Yu  wrote:
>>
>>> Lij:
>>>
>>> jar tvf
>>> /Users/tyu/.m2/repository//mysql/mysql-connector-java/5.1.31/mysql-connector-java-5.1.31.jar
>>> | grep MySQLSyntaxErrorExceptio
>>>914 Wed May 21 01:42:16 PDT 2014
>>> com/mysql/jdbc/exceptions/MySQLSyntaxErrorException.class
>>>842 Wed May 21 01:42:18 PDT 2014
>>> com/mysql/jdbc/exceptions/jdbc4/MySQLSyntaxErrorException.class
>>>
>>> 5.1.34 has basically the same structure.
>>>
>>> Can you check if there is other version of mysql-connector-java on the
>>> classpath ?
>>>
>>> Thanks
>>>
>>> On Mon, Oct 19, 2015 at 11:26 AM, Lij Tapel  wrote:
>>>
 Sorry, here's the logs and source:

 The error I see in spark 1.5.1: http://pastebin.com/86K9WQ5f
 * full logs here: http://pastebin.com/dfysSh9E

 What I used to see in spark 1.4.1: http://pastebin.com/eK3AZQFx
 * full logs here: http://pastebin.com/iffSFFWW

 The source and build.sbt: http://pastebin.com/tUvcBerd

 On Mon, Oct 19, 2015 at 11:18 AM, Ted Yu  wrote:

> The attachments didn't go through.
>
> Consider pastbebin'ning.
>
> Thanks
>
> On Mon, Oct 19, 2015 at 11:15 AM, gbop  wrote:
>
>> I've been struggling with a particularly puzzling issue after
>> upgrading to
>> Spark 1.5.1 from Spark 1.4.1.
>>
>> When I use the MySQL JDBC connector and an exception (e.g.
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown
>> on the
>> executor, I get a ClassNotFoundException on the driver, which results
>> in
>> this error (logs are abbreviated):
>>
>>
>>
>>  In Spark 1.4.1, I get the following (logs are abbreviated):
>>
>>
>>
>> I have seriously screwed up somewhere or this is a change in behavior
>> that I
>> have not been able to find in the documentation. For those that are
>> interested, a full repro and logs follow.
>>
>>
>> ---
>>
>> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in
>> various
>> combinations of
>>  * local/standalone mode
>>  * putting mysql on the classpath with --jars/building a fat jar with
>> mysql
>> in it/manually running sc.addJar on the mysql jar
>>  * --deploy-mode client/--deploy-mode cluster
>> but nothing seems to change.
>>
>>
>>
>> Here is an example invocation, and the accompanying source code:
>>
>>
>>
>>
>> The source code:
>>
>>
>>
>> And the build.sbt:
>>
>>
>>
>>
>> And here are the results when run against Spark 1.4.1 (build.sbt has
>> been
>> updated accordingly)
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>>
>>
>


Re: Is one batch created by Streaming Context always equal to one RDD?

2015-10-19 Thread Tathagata Das
Each DStream creates one RDD per batch.

On Mon, Oct 19, 2015 at 4:39 AM, vaibhavrtk  wrote:

>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-one-batch-created-by-Streaming-Context-always-equal-to-one-RDD-tp25117.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5 Streaming and Kinesis

2015-10-19 Thread Phil Kallos
I am currently trying a few code changes to see if I can squash this error.
I have created https://issues.apache.org/jira/browse/SPARK-11193 to track
progress, hope that is okay!

In the meantime, can anyone confirm their ability to run the Kinesis-ASL
example using Spark > 1.5.x ? Would be helpful to know if it works in some
cases but not others.
http://spark.apache.org/docs/1.5.1/streaming-kinesis-integration.html

Thanks
Phil

On Thu, Oct 15, 2015 at 10:35 PM, Jean-Baptiste Onofré 
wrote:

> Hi Phil,
>
> sorry I didn't have time to investigate yesterday (I was on a couple of
> other Apache projects ;)). I will try to do it today. I keep you posted.
>
> Regards
> JB
>
> On 10/16/2015 07:21 AM, Phil Kallos wrote:
>
>> JB,
>>
>> To clarify, you are able to run the Amazon Kinesis example provided in
>> the spark examples dir?
>>
>> bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
>> [endpoint url] ?
>>
>> If it helps, below are the steps I used to build spark
>>
>> mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package
>>
>> And I did this with revision 4f894dd6906311cb57add6757690069a18078783
>> (v.1.5.1)
>>
>> Thanks,
>> Phil
>>
>>
>> On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi > > wrote:
>>
>> So running it using spark-submit doesnt change anything, it still
>> works.
>>
>> When reading the code
>>
>> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
>> it looks like the receivers are definitely being ser/de. I think
>> this is the issue, need to find a way to confirm that now...
>>
>> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi > >:
>>
>> Hey,
>>
>> A quick update on other things that have been tested.
>>
>> When looking at the compiled code of the
>> spark-streaming-kinesis-asl jar everything looks normal (there
>> is a class that implements SyncMap and it is used inside the
>> receiver).
>> Starting a spark shell and using introspection to instantiate a
>> receiver and check that blockIdToSeqNumRanges implements SyncMap
>> works too. So obviously it has the correct type according to that.
>>
>> Another thing to test could be to do the same introspection
>> stuff but inside a spark job to make sure it is not a problem in
>> the way the jobs are run.
>> The other idea would be that this is a problem related to
>> ser/de. For example if the receiver was being serialized and
>> then deserialized it could definitely happen depending on the
>> lib used and its configuration that it just doesn't preserve the
>> concrete type. So it would deserialize using the compile type
>> instead of the runtime type.
>>
>> Cheers,
>> Eugen
>>
>>
>> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré > >:
>>
>> Thanks for the update Phil.
>>
>> I'm preparing a environment to reproduce it.
>>
>> I keep you posted.
>>
>> Thanks again,
>> Regards
>> JB
>>
>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>> Not a dumb question, but yes I updated all of the
>> library references to
>> 1.5, including  (even tried 1.5.1).
>>
>> // Versions.spark set elsewhere to "1.5.0"
>> "org.apache.spark" %% "spark-streaming-kinesis-asl" %
>> Versions.spark %
>> "provided"
>>
>> I am experiencing the issue in my own spark project, but
>> also when I try
>> to run the spark streaming kinesis example that comes in
>> spark/examples
>>
>> Tried running the streaming job locally, and also in EMR
>> with release
>> 4.1.0 that includes Spark 1.5
>>
>> Very strange!
>>
>>  -- Forwarded message --
>>
>>  From: "Jean-Baptiste Onofré" >  > >>
>>  To: user@spark.apache.org
>> 
>> >
>> >
>>
>>  Cc:
>>  Date: Thu, 15 Oct 2015 08:03:55 +0200
>>  Subject: Re: Spark 1.5 Streaming and Kinesis
>>  Hi Phil,
>>  KinesisReceiver is part of extra. Just a dumb
>> question: did you
>>  update all, 

Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread fahad shah
Thanks Davies, sure, I can share the code/data in pm - best fahad

On Mon, Oct 19, 2015 at 10:52 AM, Davies Liu  wrote:
> Could you simplify the code a little bit so we can reproduce the failure?
> (may also have some sample dataset if it depends on them)
>
> On Sun, Oct 18, 2015 at 10:42 PM, fahad shah  wrote:
>>  Hi
>>
>> I am trying to do pair rdd's, group by the key assign id based on key.
>> I am using Pyspark with spark 1.3, for some reason, I am getting this
>> error that I am unable to figure out - any help much appreciated.
>>
>> Things I tried (but to no effect),
>>
>> 1. make sure I am not doing any conversions on the strings
>> 2. make sure that the fields used in the key are all there  and not
>> empty string (or else I toss the row out)
>>
>> My code is along following lines (split is using stringio to parse
>> csv, header removes the header row and parse_train is putting the 54
>> fields into named tuple after whitespace/quote removal):
>>
>> #Error for string argument is thrown on the BB.take(1) where the
>> groupbykey is evaluated
>>
>> A = sc.textFile("train.csv").filter(lambda x:not
>> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
>> None)
>>
>> A.count()
>>
>> B = A.map(lambda k:
>> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>>  k.srch_children_count,k.srch_room_count), 
>> (k[0:54])))
>> BB = B.groupByKey()
>> BB.take(1)
>>
>>
>> best fahad
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does shuffle work in spark ?

2015-10-19 Thread Adrian Tanase
I don’t know why it expands to 50 GB but it’s correct to see it both on the 
first operation (shuffled write) and on the next one (shuffled read). It’s the 
barrier between the 2 stages.

-adrian

From: shahid ashraf
Date: Monday, October 19, 2015 at 9:53 PM
To: Kartik Mathur, Adrian Tanase
Cc: user
Subject: Re: How does shuffle work in spark ?

hi  THANKS

i don't understand, if original data on partitions is 3.5 G and by doing 
shuffle to that... how it expands to 50 GB... and why then it reads 50 GB for 
next operations.. i have original data set 0f 100 GB then my data will explode 
to 1,428.5714286 GBs
and so shuffle reads will be 1,428.5714286 GBs that will be insane

On Mon, Oct 19, 2015 at 11:58 PM, Kartik Mathur 
> wrote:
That sounds like correct shuffle output , in spark map reduce phase is 
separated by shuffle , in map each executer writes on local disk and in reduce 
phase reducerS reads data from each executer over the network , so shuffle 
definitely hurts performance , for more details on spark shuffle phase please 
read this

http://0x0fff.com/spark-architecture-shuffle/

Thanks
Kartik


On Mon, Oct 19, 2015 at 6:54 AM, shahid  wrote:
@all i did partitionby using default hash partitioner on data
[(1,data)(2,(data),(n,data)]
the total data was approx 3.5 it showed shuffle write 50G and on next action
e.g count it is showing shuffle read of 50 G. i don't understand this
behaviour and i think the performance is getting slow with so much shuffle
read on next tranformation operations.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





--
with Regards
Shahid Ashraf


Re: Spark Streaming scheduler delay VS driver.cores

2015-10-19 Thread Adrian Tanase
Bump on this question – does anyone know what is the effect of 
spark.driver.cores on the driver's ability to manage larger clusters?

Any tips on setting a correct value? I’m running Spark streaming on Yarn / 
Hadoop 2.6 / Spark 1.5.1.

Thanks,
-adrian

From: Adrian Tanase
Date: Saturday, October 17, 2015 at 10:58 PM
To: "user@spark.apache.org"
Subject: Spark Streaming scheduler delay VS driver.cores

Hi,

I’ve recently bumped up the resources for a spark streaming job – and the 
performance started to degrade over time.
it was running fine on 7 nodes with 14 executor cores each (via Yarn) until I 
bumped executor.cores to 22 cores/node (out of 32 on AWS c3.xlarge, 24 for yarn)

The driver has 2 cores and 2 GB ram (usage is at zero).

For really low data volume it goes from 1-2 seconds per batch to 4-5 s/batch 
after about 6 hours, doing almost nothing. I’ve noticed that the scheduler 
delay is 3-4s, even 5-6 seconds for some tasks. Should be in the low tens of 
milliseconds. What’s weirder is that under moderate load (thousands of events 
per second) - the delay is not as obvious anymore.

After this I reduced the executor.cores to 20 and bumped driver.cores to 4 and 
it seems to be ok now.
However, this is totally empirical, I have not found any documentation, code 
samples or email discussion on how to properly set driver.cores.

Does anyone know:

  *   If I assign more cores to the driver/application manager, will it use 
them?
 *   I was looking at the process list with htop and only one of the jvm’s 
on the driver was really taking up CPU time
  *   What is a decent parallelism factor for a streaming app with 10-20 secs 
batch time? I found it odd that at  7 x 22 = 154 the driver is becoming a 
bottleneck
 *   I’ve seen people recommend 3-4 taks/core or ~1000 parallelism for 
clusters in the tens of nodes

Thanks in advance,
-adrian


Re: Streaming of COAP Resources

2015-10-19 Thread Adrian Tanase
I’m not familiar with you COAP library but onStart is called only once. You’re 
only reading the value once when the custom receiver is initialized.

You need to set-up a callback, poll a buffer — again, depends on your COAP 
client — In short configure your client to “start listening for changes”
Then you need to call .store() for every new value that you’re notified of.

-adrian



On 10/16/15, 9:38 AM, "Sadaf"  wrote:

>I am currently working on IOT Coap protocol.I accessed server on local host
>through copper firefox plugin. Then i Added resouce having "GET"
>functionality in server. After that i made its client as a streaming source.
>Here is the code of client streaming
>
> class customReceiver(test:String) extends 
>Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging with
>Serializable { 
>   @volatile private var stopped = false
>   override def onStart() {
>
>  val client = new CoapClient("ip/resource")
>  var text = client.get().getResponseText();  
>  store(text)
>   }
>   override def onStop(): Unit = synchronized { 
>  try
>  {
> stopped = true
>  }
>  catch
>  {
> case e: Exception => println("exception caught: " + e);
>  }
>   }
> }
>but i am facing a problem. During streaming it just read a resource once.
>after that it fetches all empty rdd and completes its batches. Meanwhile if
>resource changes its value it doesn't read that. are i doing something
>wrong? or is there exists any other functionality to read whenever resource
>get changed that i can handle in my Custom receiver.? or any idea about how
>to GET value continuously during streaming?
>
>Any help is much awaited and appreciated. Thanks
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-of-COAP-Resources-tp25084.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: In-memory computing and cache() in Spark

2015-10-19 Thread Jia Zhan
Hi Igor,

It iterative conducts reduce((a,b)*=>*a+b) which is the action there. I can
see clearly 4 stages (one saveAsTextFile() and three Reduce()) in the web
UI. Don't know what's going there that causes the non-intuitive caching
behavior.

Thanks for help!

On Sun, Oct 18, 2015 at 11:32 PM, Igor Berman  wrote:

> Does ur iterations really submit job? I dont see any action there
> On Oct 17, 2015 00:03, "Jia Zhan"  wrote:
>
>> Hi all,
>>
>> I am running Spark locally in one node and trying to sweep the memory
>> size for performance tuning. The machine has 8 CPUs and 16G main memory,
>> the dataset in my local disk is about 10GB. I have several quick questions
>> and appreciate any comments.
>>
>> 1. Spark performs in-memory computing, but without using RDD.cache(),
>> will anything be cached in memory at all? My guess is that, without
>> RDD.cache(), only a small amount of data will be stored in OS buffer cache,
>> and every iteration of computation will still need to fetch most data from
>> disk every time, is that right?
>>
>> 2. To evaluate how caching helps with iterative computation, I wrote a
>> simple program as shown below, which basically consists of one saveAsText()
>> and three reduce() actions/stages. I specify "spark.driver.memory" to
>> "15g", others by default. Then I run three experiments.
>>
>> *   val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>>
>>*val* *sc* = *new* *SparkContext*(conf)
>>
>>*val* *input* = sc.textFile(*"/InputFiles"*)
>>
>>   *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
>> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>>
>>   *val* *ITERATIONS* = *3*
>>
>>   *for* (i *<-* *1* to *ITERATIONS*) {
>>
>>   *val* *totallength* = input.filter(line*=>*line.contains(
>> *"the"*)).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>>
>>   }
>>
>> (I) The first run: no caching at all. The application finishes in ~12
>> minutes (2.6min+3.3min+3.2min+3.3min)
>>
>> (II) The second run, I modified the code so that the input will be
>> cached:
>>  *val input = sc.textFile("/InputFiles").cache()*
>>  The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>>  The storage page in Web UI shows 48% of the dataset  is cached,
>> which makes sense due to large java object overhead, and
>> spark.storage.memoryFraction is 0.6 by default.
>>
>> (III) However, the third run, same program as the second one, but I
>> changed "spark.driver.memory" to be "2g".
>>The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
>> And UI shows 6% of the data is cached.
>>
>> *From the results we can see the reduce stages finish in seconds, how
>> could that happen with only 6% cached? Can anyone explain?*
>>
>> I am new to Spark and would appreciate any help on this. Thanks!
>>
>> Jia
>>
>>
>>
>>


-- 
Jia Zhan


Re: best way to generate per key auto increment numerals after sorting

2015-10-19 Thread Davies Liu
What's the issue with groupByKey()?

On Mon, Oct 19, 2015 at 1:11 AM, fahad shah  wrote:
> Hi
>
> I wanted to ask whats the best way to achieve per key auto increment
> numerals after sorting, for eg. :
>
> raw file:
>
> 1,a,b,c,1,1
> 1,a,b,d,0,0
> 1,a,b,e,1,0
> 2,a,e,c,0,0
> 2,a,f,d,1,0
>
> post-output (the last column is the position number after grouping on
> first three fields and reverse sorting on last two values)
>
> 1,a,b,c,1,1,1
> 1,a,b,d,0,0,3
> 1,a,b,e,1,0,2
> 2,a,e,c,0,0,2
> 2,a,f,d,1,0,1
>
> I am using solution that uses groupbykey but that is running into some
> issues (possibly bug with pyspark/spark?), wondering if there is a
> better way to achieve this.
>
> My solution:
>
> A = A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> B = A.map(lambda k:
> ((k.first_field,k.second_field,k.first_field,k.third_field),
> (k[0:5]))).groupByKey()
>
> B.map(sort_n_set_position).flatMap(lambda line: line)
>
> where sort and set position iterates over the iterator and performs
> sorting and adding last column.
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread Davies Liu
Could you simplify the code a little bit so we can reproduce the failure?
(may also have some sample dataset if it depends on them)

On Sun, Oct 18, 2015 at 10:42 PM, fahad shah  wrote:
>  Hi
>
> I am trying to do pair rdd's, group by the key assign id based on key.
> I am using Pyspark with spark 1.3, for some reason, I am getting this
> error that I am unable to figure out - any help much appreciated.
>
> Things I tried (but to no effect),
>
> 1. make sure I am not doing any conversions on the strings
> 2. make sure that the fields used in the key are all there  and not
> empty string (or else I toss the row out)
>
> My code is along following lines (split is using stringio to parse
> csv, header removes the header row and parse_train is putting the 54
> fields into named tuple after whitespace/quote removal):
>
> #Error for string argument is thrown on the BB.take(1) where the
> groupbykey is evaluated
>
> A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> A.count()
>
> B = A.map(lambda k:
> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>  k.srch_children_count,k.srch_room_count), (k[0:54])))
> BB = B.groupByKey()
> BB.take(1)
>
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Storing Compressed data in HDFS into Spark

2015-10-19 Thread ahaider3
Hi,
A lot of the data I have in HDFS is compressed. I noticed when I load this
data into spark and cache it, Spark unrolls the data like normal but stores
the data uncompressed in memory. For example, suppose /data/ is an RDD with
compressed partitions on HDFS. I then cache the data. When I call
/data.count()/, the data is rightly decompressed since it needs to find the
value of /.count()/. But, the data that is cached is also decompressed. Can
a partition be compressed in spark? I know spark allows for data to be
compressed, after serialization. But what if, I only want the partitions
compressed. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Storing-Compressed-data-in-HDFS-into-Spark-tp25123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread Lij Tapel
I have verified that there is only 5.1.34 on the classpath.

Funnily enough, I have a repro that doesn't even use mysql so this seems to
be purely a classloader issue:

source: http://pastebin.com/WMCMwM6T
1.4.1: http://pastebin.com/x38DQY2p
1.5.1: http://pastebin.com/DQd6k818



On Mon, Oct 19, 2015 at 11:51 AM, Ted Yu  wrote:

> Lij:
>
> jar tvf
> /Users/tyu/.m2/repository//mysql/mysql-connector-java/5.1.31/mysql-connector-java-5.1.31.jar
> | grep MySQLSyntaxErrorExceptio
>914 Wed May 21 01:42:16 PDT 2014
> com/mysql/jdbc/exceptions/MySQLSyntaxErrorException.class
>842 Wed May 21 01:42:18 PDT 2014
> com/mysql/jdbc/exceptions/jdbc4/MySQLSyntaxErrorException.class
>
> 5.1.34 has basically the same structure.
>
> Can you check if there is other version of mysql-connector-java on the
> classpath ?
>
> Thanks
>
> On Mon, Oct 19, 2015 at 11:26 AM, Lij Tapel  wrote:
>
>> Sorry, here's the logs and source:
>>
>> The error I see in spark 1.5.1: http://pastebin.com/86K9WQ5f
>> * full logs here: http://pastebin.com/dfysSh9E
>>
>> What I used to see in spark 1.4.1: http://pastebin.com/eK3AZQFx
>> * full logs here: http://pastebin.com/iffSFFWW
>>
>> The source and build.sbt: http://pastebin.com/tUvcBerd
>>
>> On Mon, Oct 19, 2015 at 11:18 AM, Ted Yu  wrote:
>>
>>> The attachments didn't go through.
>>>
>>> Consider pastbebin'ning.
>>>
>>> Thanks
>>>
>>> On Mon, Oct 19, 2015 at 11:15 AM, gbop  wrote:
>>>
 I've been struggling with a particularly puzzling issue after upgrading
 to
 Spark 1.5.1 from Spark 1.4.1.

 When I use the MySQL JDBC connector and an exception (e.g.
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on
 the
 executor, I get a ClassNotFoundException on the driver, which results in
 this error (logs are abbreviated):



  In Spark 1.4.1, I get the following (logs are abbreviated):



 I have seriously screwed up somewhere or this is a change in behavior
 that I
 have not been able to find in the documentation. For those that are
 interested, a full repro and logs follow.


 ---

 I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in
 various
 combinations of
  * local/standalone mode
  * putting mysql on the classpath with --jars/building a fat jar with
 mysql
 in it/manually running sc.addJar on the mysql jar
  * --deploy-mode client/--deploy-mode cluster
 but nothing seems to change.



 Here is an example invocation, and the accompanying source code:




 The source code:



 And the build.sbt:




 And here are the results when run against Spark 1.4.1 (build.sbt has
 been
 updated accordingly)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: pyspark: results differ based on whether persist() has been called

2015-10-19 Thread Davies Liu
This should be fixed by
https://github.com/apache/spark/commit/a367840834b97cd6a9ecda568bb21ee6dc35fcde

Will be released as 1.5.2 soon.

On Mon, Oct 19, 2015 at 9:04 AM, peay2  wrote:
> Hi,
>
> I am getting some very strange results, where I get different results based
> on whether or not I call persist() on a data frame or not before
> materialising it.
>
> There's probably something obvious I am missing, as only very simple
> operations are involved here. Any help with this would be greatly
> appreciated. I have a simple data-frame with IDs and values:
>
> data_dict = {'id': {k: str(k) for k in range(99)}, 'value':
> dict(enumerate(['A'] * 4 + ['B'] * 46 + ['C'] * 49))}
> df_small = pd.DataFrame(data_dict)
> records = sqlContext.createDataFrame(df_small)
> records.printSchema()
>
> # root
> # |-- id: string (nullable = true)
> # |-- value: string (nullable = true)
>
> Now, I left outer join over the IDs -- here, using a dummy constant column
> on the right instead of a separate data-frame (enough to reproduce my
> issue):
>
> unique_ids = records.select("id").dropDuplicates()
> id_names = unique_ids.select(F.col("id").alias("id_join"),
> F.lit("xxx").alias("id_name"))
>
> df_joined = records.join(id_names, records['id'] == id_names['id_join'],
> "left_outer").drop("id_join")
>
> At this point, *doing a show on df_joined* indicates all is fine: all
> records are there as expected, for instance:
>
> df_joined[(df_joined['id'] > 60) & (df_joined['id'] < 70)].show()
> +---+-+---+
> | id|value|id_name|
> +---+-+---+
> | 61|C|xxx|
> | 62|C|xxx|
> | 63|C|xxx|
> | 64|C|xxx|
> ...
>
> However, if I filter for a given value and then group by ID, I do not get
> back all of the groups:
>
> def print_unique_ids(df):
>filtered = df[df["value"] == "C"]
>plan = filtered.groupBy("id").count().select("id")
>unique_ids = list(plan.toPandas()["id"])
>
>print "{0} IDs: {1}\n".format(len(unique_ids), sorted(unique_ids))
>print plan.rdd.toDebugString() + "\n"
>
> print_unique_ids(df_joined.unpersist())
> print_unique_ids(df_joined.persist())
>
> 49 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58',
> u'59', u'60', u'61', u'62', u'63', u'64', u'65', u'66', u'67', u'68', u'69',
> u'70', u'71', u'72', u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80',
> u'81', u'82', u'83', u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91',
> u'92', u'93', u'94', u'95', u'96', u'97', u'98']
>
> 46 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58',
> u'59', u'60', u'61', u'62', u'66', u'67', u'68', u'69', u'70', u'71', u'72',
> u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', u'81', u'82', u'83',
> u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', u'92', u'93', u'94',
> u'95', u'96', u'97', u'98']
>
> Note how here IDs 43, 44, 45 are missing when persist() has been called. The
> output is correct if the data-frame has not been marked for persistance, but
> incorrect after the call to persist.
>
> When persist() has been called, Tungsten seems to be involved, but not if
> the data-frame has not been persisted. I am including the full outputs of
> toDebugString below.
>
> Has anyone any idea what is going on here?
>
> In case this helps: I see no issue if I don't do the dummy join, or if I
> don't filter for value == "C". I have a default spark config, besides
> "spark.shuffle.consolidateFiles=true", and spark 1.5.1.
>
> Thanks a lot!
>
> - Without persist:
>
> (200) MapPartitionsRDD[26] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsRDD[25] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsWithPreparationRDD[22] at toPandas at
> :25 []
>   |   MapPartitionsWithPreparationRDD[21] at toPandas at
> :25 []
>   |   MapPartitionsRDD[20] at toPandas at :25 []
>   |   ZippedPartitionsRDD2[19] at toPandas at :25 []
>   |   MapPartitionsWithPreparationRDD[9] at toPandas at
> :25 []
>   |   ShuffledRowRDD[8] at toPandas at :25 []
>   +-(2) MapPartitionsRDD[7] at toPandas at :25 []
>  |  MapPartitionsRDD[6] at toPandas at :25 []
>  |  MapPartitionsRDD[5] at toPandas at :25 []
>  |  MapPartitionsRDD[4] at applySchemaToPythonRDD at
> NativeMethodAccessorImpl.java:-2 []
>  |  MapPartitionsRDD[3] at map at SerDeUtil.scala:100 []
>  |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 []
>  |  PythonRDD[1] at RDD at PythonRDD.scala:43 []
>  |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []
>   |   MapPartitionsWithPreparationRDD[18] at toPandas at
> :25 []
>   |   ShuffledRowRDD[17] at toPandas at :25 []
>   +-(200) MapPartitionsRDD[16] at toPandas at :25 []
>   |   MapPartitionsRDD[15] at toPandas at :25 []
>   |   MapPartitionsWithPreparationRDD[14] at toPandas at
> :25 []
>   |  

Re: new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread Ted Yu
The attachments didn't go through.

Consider pastbebin'ning.

Thanks

On Mon, Oct 19, 2015 at 11:15 AM, gbop  wrote:

> I've been struggling with a particularly puzzling issue after upgrading to
> Spark 1.5.1 from Spark 1.4.1.
>
> When I use the MySQL JDBC connector and an exception (e.g.
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on the
> executor, I get a ClassNotFoundException on the driver, which results in
> this error (logs are abbreviated):
>
>
>
>  In Spark 1.4.1, I get the following (logs are abbreviated):
>
>
>
> I have seriously screwed up somewhere or this is a change in behavior that
> I
> have not been able to find in the documentation. For those that are
> interested, a full repro and logs follow.
>
>
> ---
>
> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
> combinations of
>  * local/standalone mode
>  * putting mysql on the classpath with --jars/building a fat jar with mysql
> in it/manually running sc.addJar on the mysql jar
>  * --deploy-mode client/--deploy-mode cluster
> but nothing seems to change.
>
>
>
> Here is an example invocation, and the accompanying source code:
>
>
>
>
> The source code:
>
>
>
> And the build.sbt:
>
>
>
>
> And here are the results when run against Spark 1.4.1 (build.sbt has been
> updated accordingly)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread Ted Yu
Lij:

jar tvf
/Users/tyu/.m2/repository//mysql/mysql-connector-java/5.1.31/mysql-connector-java-5.1.31.jar
| grep MySQLSyntaxErrorExceptio
   914 Wed May 21 01:42:16 PDT 2014
com/mysql/jdbc/exceptions/MySQLSyntaxErrorException.class
   842 Wed May 21 01:42:18 PDT 2014
com/mysql/jdbc/exceptions/jdbc4/MySQLSyntaxErrorException.class

5.1.34 has basically the same structure.

Can you check if there is other version of mysql-connector-java on the
classpath ?

Thanks

On Mon, Oct 19, 2015 at 11:26 AM, Lij Tapel  wrote:

> Sorry, here's the logs and source:
>
> The error I see in spark 1.5.1: http://pastebin.com/86K9WQ5f
> * full logs here: http://pastebin.com/dfysSh9E
>
> What I used to see in spark 1.4.1: http://pastebin.com/eK3AZQFx
> * full logs here: http://pastebin.com/iffSFFWW
>
> The source and build.sbt: http://pastebin.com/tUvcBerd
>
> On Mon, Oct 19, 2015 at 11:18 AM, Ted Yu  wrote:
>
>> The attachments didn't go through.
>>
>> Consider pastbebin'ning.
>>
>> Thanks
>>
>> On Mon, Oct 19, 2015 at 11:15 AM, gbop  wrote:
>>
>>> I've been struggling with a particularly puzzling issue after upgrading
>>> to
>>> Spark 1.5.1 from Spark 1.4.1.
>>>
>>> When I use the MySQL JDBC connector and an exception (e.g.
>>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on
>>> the
>>> executor, I get a ClassNotFoundException on the driver, which results in
>>> this error (logs are abbreviated):
>>>
>>>
>>>
>>>  In Spark 1.4.1, I get the following (logs are abbreviated):
>>>
>>>
>>>
>>> I have seriously screwed up somewhere or this is a change in behavior
>>> that I
>>> have not been able to find in the documentation. For those that are
>>> interested, a full repro and logs follow.
>>>
>>>
>>> ---
>>>
>>> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
>>> combinations of
>>>  * local/standalone mode
>>>  * putting mysql on the classpath with --jars/building a fat jar with
>>> mysql
>>> in it/manually running sc.addJar on the mysql jar
>>>  * --deploy-mode client/--deploy-mode cluster
>>> but nothing seems to change.
>>>
>>>
>>>
>>> Here is an example invocation, and the accompanying source code:
>>>
>>>
>>>
>>>
>>> The source code:
>>>
>>>
>>>
>>> And the build.sbt:
>>>
>>>
>>>
>>>
>>> And here are the results when run against Spark 1.4.1 (build.sbt has been
>>> updated accordingly)
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Differentiate Spark streaming in event logs

2015-10-19 Thread Adrian Tanase
You could try to start the 2/N jobs with a slightly different log4j template, 
by prepending some job type to all the messages...




On 10/19/15, 9:47 PM, "franklyn"  wrote:

>Hi I'm running a job to collect some analytics on spark jobs by analyzing
>their event logs. We write the event logs to a single HDFS folder and then
>pick them up in another job. I'd like to differentiate between regular spark
>jobs and spark streaming jobs in the event logs, i was wondering if there is
>an event/property/key that is different between the two.
>
>thanks!,
>
>Franklyn
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Differentiate-Spark-streaming-in-event-logs-tp25126.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: How to calculate row by now and output retults in Spark

2015-10-19 Thread Ted Yu
Under core/src/test/scala/org/apache/spark , you will find a lot of
examples for map function.

FYI

On Mon, Oct 19, 2015 at 10:35 AM, Shepherd  wrote:

> Hi all, I am new in Spark and Scala. I have a question in doing
> calculation. I am using "groupBy" to generate key value pair, and the value
> points to a subset of original RDD. The RDD has four columns, and each
> subset RDD may have different number of rows. For example, the original
> code like this:" val b = a.gorupBy(_._2) val res = b.map{case (k, v) =>
> v.map(func)} " Here, I don't know how to write the func. I have to run each
> row in v, and calculate statistic result. How can I do that? And, how can I
> write function in Map? Thanks a lot.
> --
> View this message in context: How to calculate row by now and output
> retults in Spark
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Differentiate Spark streaming in event logs

2015-10-19 Thread franklyn
Hi I'm running a job to collect some analytics on spark jobs by analyzing
their event logs. We write the event logs to a single HDFS folder and then
pick them up in another job. I'd like to differentiate between regular spark
jobs and spark streaming jobs in the event logs, i was wondering if there is
an event/property/key that is different between the two.

thanks!,

Franklyn



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Differentiate-Spark-streaming-in-event-logs-tp25126.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to calculate row by now and output retults in Spark

2015-10-19 Thread Shepherd
Hi all, I am new in Spark and Scala. I have a question in doing calculation.I
am using "groupBy" to generate key value pair, and the value points to a
subset of original RDD. The RDD has four columns, and each subset RDD may
have different number of rows.For example, the original code like this:"val
b = a.gorupBy(_._2) val res = b.map{case (k, v) => v.map(func)}"Here, I
don't know how to write the func. I have to run each row in v, and calculate
statistic result.How can I do that?And, how can I write function in
Map?Thanks a lot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-row-by-now-and-output-retults-in-Spark-tp25122.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

writing avro parquet

2015-10-19 Thread Alex Nastetsky
Using Spark 1.5.1, Parquet 1.7.0.

I'm trying to write Avro/Parquet files. I have this code:

sc.hadoopConfiguration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS,
classOf[AvroWriteSupport].getName)
AvroWriteSupport.setSchema(sc.hadoopConfiguration, MyClass.SCHEMA$)
myDF.write.parquet(outputPath)

The problem is that the write support class gets overwritten in
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation#prepareJobForWrite:

val writeSupportClass =
if
(dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
classOf[MutableRowWriteSupport]
} else {
classOf[RowWriteSupport]
}
ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)

So it doesn't seem to actually write Avro data. When look at the metadata
of the Parquet files it writes, it looks like this:

extra: org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"foo","type":"string","nullable":true,"metadata":{}},{"name":"bar","type":"long","nullable":true,"metadata":{}}]}

I would expect to see something like "extra:  avro.schema" instead.


new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread gbop
I've been struggling with a particularly puzzling issue after upgrading to
Spark 1.5.1 from Spark 1.4.1.

When I use the MySQL JDBC connector and an exception (e.g.
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on the
executor, I get a ClassNotFoundException on the driver, which results in
this error (logs are abbreviated):



 In Spark 1.4.1, I get the following (logs are abbreviated):



I have seriously screwed up somewhere or this is a change in behavior that I
have not been able to find in the documentation. For those that are
interested, a full repro and logs follow.


---

I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
combinations of 
 * local/standalone mode
 * putting mysql on the classpath with --jars/building a fat jar with mysql
in it/manually running sc.addJar on the mysql jar 
 * --deploy-mode client/--deploy-mode cluster
but nothing seems to change.



Here is an example invocation, and the accompanying source code:




The source code:



And the build.sbt:




And here are the results when run against Spark 1.4.1 (build.sbt has been
updated accordingly)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: new 1.5.1 behavior - exception on executor throws ClassNotFound on driver

2015-10-19 Thread Lij Tapel
Sorry, here's the logs and source:

The error I see in spark 1.5.1: http://pastebin.com/86K9WQ5f
* full logs here: http://pastebin.com/dfysSh9E

What I used to see in spark 1.4.1: http://pastebin.com/eK3AZQFx
* full logs here: http://pastebin.com/iffSFFWW

The source and build.sbt: http://pastebin.com/tUvcBerd

On Mon, Oct 19, 2015 at 11:18 AM, Ted Yu  wrote:

> The attachments didn't go through.
>
> Consider pastbebin'ning.
>
> Thanks
>
> On Mon, Oct 19, 2015 at 11:15 AM, gbop  wrote:
>
>> I've been struggling with a particularly puzzling issue after upgrading to
>> Spark 1.5.1 from Spark 1.4.1.
>>
>> When I use the MySQL JDBC connector and an exception (e.g.
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on
>> the
>> executor, I get a ClassNotFoundException on the driver, which results in
>> this error (logs are abbreviated):
>>
>>
>>
>>  In Spark 1.4.1, I get the following (logs are abbreviated):
>>
>>
>>
>> I have seriously screwed up somewhere or this is a change in behavior
>> that I
>> have not been able to find in the documentation. For those that are
>> interested, a full repro and logs follow.
>>
>>
>> ---
>>
>> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
>> combinations of
>>  * local/standalone mode
>>  * putting mysql on the classpath with --jars/building a fat jar with
>> mysql
>> in it/manually running sc.addJar on the mysql jar
>>  * --deploy-mode client/--deploy-mode cluster
>> but nothing seems to change.
>>
>>
>>
>> Here is an example invocation, and the accompanying source code:
>>
>>
>>
>>
>> The source code:
>>
>>
>>
>> And the build.sbt:
>>
>>
>>
>>
>> And here are the results when run against Spark 1.4.1 (build.sbt has been
>> updated accordingly)
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/new-1-5-1-behavior-exception-on-executor-throws-ClassNotFound-on-driver-tp25124.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to calculate row by now and output retults in Spark

2015-10-19 Thread Adrian Tanase
Are you by any chance looking for reduceByKey? IF you’re trying to collapse all 
the values in V into an aggregate, that’s what you should be looking at.

-adrian

From: Ted Yu
Date: Monday, October 19, 2015 at 9:16 PM
To: Shepherd
Cc: user
Subject: Re: How to calculate row by now and output retults in Spark

Under core/src/test/scala/org/apache/spark , you will find a lot of examples 
for map function.

FYI

On Mon, Oct 19, 2015 at 10:35 AM, Shepherd 
> wrote:
Hi all, I am new in Spark and Scala. I have a question in doing calculation. I 
am using "groupBy" to generate key value pair, and the value points to a subset 
of original RDD. The RDD has four columns, and each subset RDD may have 
different number of rows. For example, the original code like this:" val b = 
a.gorupBy(_._2) val res = b.map{case (k, v) => v.map(func)} " Here, I don't 
know how to write the func. I have to run each row in v, and calculate 
statistic result. How can I do that? And, how can I write function in Map? 
Thanks a lot.

View this message in context: How to calculate row by now and output retults in 
Spark
Sent from the Apache Spark User List mailing list 
archive at Nabble.com.



Re: Spark handling parallel requests

2015-10-19 Thread tarek.abouzeid91
Thanks guys for your advice , i will have a look on the custom receivers , 
thanks again guys for your efforts   --  Best Regards, -- Tarek Abouzeid 


 On Monday, October 19, 2015 6:50 PM, Adrian Tanase  
wrote:
   

 To answer your specific question, you can’t push data to Kafka through a 
socket – you need a smart client library as the cluster setup is pretty 
advanced (also requires zookeeper).
I bet there are php libraries for Kafka although after a quick search it seems 
they’re still pretty young. Also – Kafka shines at larger deployments and 
throughput (tens of thousands to millions of events per second) and may be 
overkill for 100 events / sec. 
Here are some other ideas:   
   - Use a lighter weight message broker like Rabbit MQ or MQTT – both have 
good integrations with spark and should be simpler to integrate with PHP
   - Instead of doing a socket call, log the event on disk – this opens up 2 
strategies  
  - If you have access to shared storage, spark could read the files 
directly
  - Otherwise, you could rely on something like Flume that can poll your 
logs and forward them to spark (There is a default integration in the spark 
external package)

   - Lastly, why not try to build on one of the custom receivers? There are 
plenty code samples in the docs and examples  
  - This may not be a good choice if you can’t afford to lose any messages 
– in this case your life is harder as you’ll need to also use WAL based 
implementation

Hope this helps,-adrian
From: "tarek.abouzei...@yahoo.com.INVALID"
Reply-To: "tarek.abouzei...@yahoo.com"
Date: Sunday, October 18, 2015 at 10:28 AM
To: Xiao Li, Akhil Das
Cc: "user@spark.apache.org"
Subject: Re: Spark handling parallel requests

hi Akhlis 
its a must to push data to a socket as i am using php as a web service to push 
data to socket , then spark catch the data on that socket and process it , is 
there a way to push data from php to kafka directly ? --  Best Regards, -- 
Tarek Abouzeid


On Sunday, October 18, 2015 10:26 AM, "tarek.abouzei...@yahoo.com" 
 wrote:


hi Xiao,1- requests are not similar at all , but they use solr and do commit 
sometimes 2- no caching is required3- the throughput must be very high yeah , 
the requests are tiny but the system may receive 100 request/sec , does kafka 
support listening to a socket ? --  Best Regards, -- Tarek Abouzeid


On Monday, October 12, 2015 10:50 AM, Xiao Li  wrote:


Hi, Tarek, 
It is hard to answer your question. Are these requests similar? Caching your 
results or intermediate results in your applications? Or does that mean your 
throughput requirement is very high? Throttling the number of concurrent 
requests? ...
As Akhil said, Kafka might help in your case. Otherwise, you need to read the 
designs or even source codes of Kafka and Spark Streaming. 
 Best wishes, 
Xiao Li

2015-10-11 23:19 GMT-07:00 Akhil Das :

Instead of pushing your requests to the socket, why don't you push them to a 
Kafka or any other message queue and use spark streaming to process them?
ThanksBest Regards
On Mon, Oct 5, 2015 at 6:46 PM,  wrote:

Hi ,
i am using Scala , doing a socket program to catch multiple requests at same 
time and then call a function which uses spark to handle each process , i have 
a multi-threaded server to handle the multiple requests and pass each to spark 
, but there's a bottleneck as the spark doesn't initialize a sub task for the 
new request , is it even possible to do parallel processing using single spark 
job ?Best Regards, --  Best Regards, -- Tarek Abouzeid









  

Re: PySpark + Streaming + DataFrames

2015-10-19 Thread Jason White
Ah, that makes sense then, thanks TD.

The conversion from RDD -> DF involves a `.take(10)` in PySpark, even if you 
provide the schema, so I was avoiding back-and-forth conversions. I’ll see if I 
can create a ‘trusted’ conversion that doesn’t involve the `take`.

-- 
Jason

On October 19, 2015 at 5:23:59 PM, Tathagata Das (t...@databricks.com) wrote:

RDD and DF are not compatible data types. So you cannot return a DF when you 
have to return an RDD. What rather you can do is return the underlying RDD of 
the dataframe by dataframe.rdd(). 


On Fri, Oct 16, 2015 at 12:07 PM, Jason White  wrote:
Hi Ken, thanks for replying.

Unless I'm misunderstanding something, I don't believe that's correct.
Dstream.transform() accepts a single argument, func. func should be a
function that accepts a single RDD, and returns a single RDD. That's what
transform_to_df does, except the RDD it returns is a DF.

I've used Dstream.transform() successfully in the past when transforming
RDDs, so I don't think my problem is there.

I haven't tried this in Scala yet, and all of the examples I've seen on the
website seem to use foreach instead of transform. Does this approach work in
Scala?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




java TwitterUtils.createStream() how create "user stream" ???

2015-10-19 Thread Andy Davidson
Hi

I wrote a little prototype that created a ³public stream² now I want to
convert it to read tweets for a large number of explicit users.

I to create a ³user stream² or a ³site stream". According to the twitter
developer doc I should be able to set the ³follows² parameter to a list of
users I am interested in

https://dev.twitter.com/streaming/overview/request-parameters#follow
follow
A comma-separated list of user IDs, indicating the users whose Tweets should
be delivered on the stream.


I am not sure how to do this? I found the doc for createStream. I am
guessing I need to set filters? Can anyone provide a example?

Kind regards

Andy

http://spark.apache.org/docs/latest/api/java/index.html

createStream
public static JavaReceiverInputDStream
 
createStream(JavaStreamingContext
  jssc,
  java.lang.String[]
filters)
Create a input stream that returns tweets received from Twitter using
Twitter4J's default OAuth authentication; this requires the system
properties twitter4j.oauth.consumerKey, twitter4j.oauth.consumerSecret,
twitter4j.oauth.accessToken and twitter4j.oauth.accessTokenSecret. Storage
level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
Parameters:jssc - JavaStreamingContext objectfilters - Set of filter strings
to get only those tweets that match themReturns:(undocumented)




Re: PySpark + Streaming + DataFrames

2015-10-19 Thread Tathagata Das
Yes, precisely! Also, for other folks who may read this, could reply back
with the trusted conversion that worked for you (for a clear solution)?

TD


On Mon, Oct 19, 2015 at 3:08 PM, Jason White 
wrote:

> Ah, that makes sense then, thanks TD.
>
> The conversion from RDD -> DF involves a `.take(10)` in PySpark, even if
> you provide the schema, so I was avoiding back-and-forth conversions. I’ll
> see if I can create a ‘trusted’ conversion that doesn’t involve the `take`.
>
> --
> Jason
>
> On October 19, 2015 at 5:23:59 PM, Tathagata Das (t...@databricks.com)
> wrote:
>
> RDD and DF are not compatible data types. So you cannot return a DF when
> you have to return an RDD. What rather you can do is return the underlying
> RDD of the dataframe by dataframe.rdd().
>
>
> On Fri, Oct 16, 2015 at 12:07 PM, Jason White 
> wrote:
>
>> Hi Ken, thanks for replying.
>>
>> Unless I'm misunderstanding something, I don't believe that's correct.
>> Dstream.transform() accepts a single argument, func. func should be a
>> function that accepts a single RDD, and returns a single RDD. That's what
>> transform_to_df does, except the RDD it returns is a DF.
>>
>> I've used Dstream.transform() successfully in the past when transforming
>> RDDs, so I don't think my problem is there.
>>
>> I haven't tried this in Scala yet, and all of the examples I've seen on
>> the
>> website seem to use foreach instead of transform. Does this approach work
>> in
>> Scala?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Succinct experience

2015-10-19 Thread Younes Naguib
Hi all,

Anyone has any experience with SuccinctRDD?

Thanks,
Younes



serialization error

2015-10-19 Thread daze5112
Hi having some problems with the piece of code I inherited:




the error messages i get are:


the code runs if i exclude the following line:


any help appreciated.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-error-tp25131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization error

2015-10-19 Thread Ted Yu
Attachments didn't go through.

Mind using pastebin to show the code / error ?

Thanks

On Mon, Oct 19, 2015 at 3:01 PM, daze5112  wrote:

> Hi having some problems with the piece of code I inherited:
>
>
>
>
> the error messages i get are:
>
>
> the code runs if i exclude the following line:
>
>
> any help appreciated.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/serialization-error-tp25131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL Exception: Conf non-local session path expected to be non-null

2015-10-19 Thread Ted Yu
A brief search led me
to ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java :

  private static final String HDFS_SESSION_PATH_KEY =
"_hive.hdfs.session.path";
...
  public static Path getHDFSSessionPath(Configuration conf) {
SessionState ss = SessionState.get();
if (ss == null) {
  String sessionPathString = conf.get(HDFS_SESSION_PATH_KEY);
  Preconditions.checkNotNull(sessionPathString,
  "Conf non-local session path expected to be non-null");
  return new Path(sessionPathString);
}
Preconditions.checkNotNull(ss.hdfsSessionPath,
"Non-local session path expected to be non-null");
return ss.hdfsSessionPath;

FYI

On Mon, Oct 19, 2015 at 1:08 PM, YaoPau  wrote:

> I've connected Spark SQL to the Hive Metastore and currently I'm running
> SQL
> code via pyspark.  Typically everything works fine, but sometimes after a
> long-running Spark SQL job I get the error below, and from then on I can no
> longer run Spark SQL commands.  I still do have both my sc and my sqlCtx.
>
> Any idea what this could mean?
>
> An error occurred while calling o36.sql.
> : org.apache.spark.sql.AnalysisException: Conf non-local session path
> expected to be non-null;
> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
> at
>
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
> at
>
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
> at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
>
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
>
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at
>
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
> at
>
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
> at
> org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
> at
> org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
> at
>
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
> at
>
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
> at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
>
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
>
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
>
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at 

Multiple Spark Streaming Jobs on Single Master

2015-10-19 Thread Augustus Hong
Hi All,

Would it be possible to run multiple spark streaming jobs on a single
master at the same time?

I currently have one master node and several worker nodes in the standalone
mode, and I used spark-submit to submit multiple spark streaming jobs.

>From what I observed, it seems like only the first submitted job would get
resources and run.  Jobs submitted afterwards will have the status
"Waiting", and will only run after the first one is finished or killed.

I tried limiting each executor to only 1 core(each worker machine has 8
cores), but the same things happens that only one job will be run, even
though there are a lot of idle cores.

Best,
Augustus



-- 
[image: Branch Metrics mobile deep linking] * Augustus
Hong*
 Data Analytics | Branch Metrics
 m 650-391-3369 | e augus...@branch.io


Filter RDD

2015-10-19 Thread Shepherd
Hi all, 
I have a very simple question.
I have a RDD, saying r1, which contains 5 columns, with both string and
Int.
How can I get a sub RDD, based on a rule, that the second column equals to a
string (s)?

Thanks a lot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filter-RDD-tp25133.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization error

2015-10-19 Thread Andy Huang
That particular line used an object which did not implement "Serializable"

On Tue, Oct 20, 2015 at 9:01 AM, daze5112  wrote:

> Hi having some problems with the piece of code I inherited:
>
>
>
>
> the error messages i get are:
>
>
> the code runs if i exclude the following line:
>
>
> any help appreciated.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/serialization-error-tp25131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Re: Multiple Spark Streaming Jobs on Single Master

2015-10-19 Thread Tathagata Das
You can set the max cores for the first submitted job such that it does not
take all the resources from the master. See
http://spark.apache.org/docs/latest/submitting-applications.html

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  *--total-executor-cores 100 \*
  /path/to/examples.jar \
  1000


On Mon, Oct 19, 2015 at 4:26 PM, Augustus Hong 
wrote:

> Hi All,
>
> Would it be possible to run multiple spark streaming jobs on a single
> master at the same time?
>
> I currently have one master node and several worker nodes in the
> standalone mode, and I used spark-submit to submit multiple spark streaming
> jobs.
>
> From what I observed, it seems like only the first submitted job would get
> resources and run.  Jobs submitted afterwards will have the status
> "Waiting", and will only run after the first one is finished or killed.
>
> I tried limiting each executor to only 1 core(each worker machine has 8
> cores), but the same things happens that only one job will be run, even
> though there are a lot of idle cores.
>
> Best,
> Augustus
>
>
>
> --
> [image: Branch Metrics mobile deep linking] * Augustus
> Hong*
>  Data Analytics | Branch Metrics
>  m 650-391-3369 | e augus...@branch.io
>


Re: Filter RDD

2015-10-19 Thread Ted Yu
See the filter() method:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L334

Cheers

On Mon, Oct 19, 2015 at 4:27 PM, Shepherd  wrote:

> Hi all, 
> I have a very simple question.
> I have a RDD, saying r1, which contains 5 columns, with both string and
> Int.
> How can I get a sub RDD, based on a rule, that the second column equals to
> a
> string (s)?
>
> Thanks a lot.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Filter-RDD-tp25133.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: In-memory computing and cache() in Spark

2015-10-19 Thread Jia Zhan
Hi Sonal,

I tried changing the size spark.executor.memory but noting changes. It
seems when I run locally in one machine, the RDD is cached in driver memory
instead of executor memory. Here is a related post online:
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-Local-Mode-td22279.html

When I change spark.driver.memory, I can see the change of cached data in
 web UI. Like I mentioned, when I set driver memory to 2G, it says 6% RDD
cached. When set to 15G, it says 48% RDD cached, but with much slower
speed!

On Sun, Oct 18, 2015 at 10:32 PM, Sonal Goyal  wrote:

> Hi Jia,
>
> RDDs are cached on the executor, not on the driver. I am assuming you are
> running locally and haven't changed spark.executor.memory?
>
> Sonal
> On Oct 19, 2015 1:58 AM, "Jia Zhan"  wrote:
>
> Anyone has any clue what's going on.? Why would caching with 2g memory
> much faster than with 15g memory?
>
> Thanks very much!
>
> On Fri, Oct 16, 2015 at 2:02 PM, Jia Zhan  wrote:
>
>> Hi all,
>>
>> I am running Spark locally in one node and trying to sweep the memory
>> size for performance tuning. The machine has 8 CPUs and 16G main memory,
>> the dataset in my local disk is about 10GB. I have several quick questions
>> and appreciate any comments.
>>
>> 1. Spark performs in-memory computing, but without using RDD.cache(),
>> will anything be cached in memory at all? My guess is that, without
>> RDD.cache(), only a small amount of data will be stored in OS buffer cache,
>> and every iteration of computation will still need to fetch most data from
>> disk every time, is that right?
>>
>> 2. To evaluate how caching helps with iterative computation, I wrote a
>> simple program as shown below, which basically consists of one saveAsText()
>> and three reduce() actions/stages. I specify "spark.driver.memory" to
>> "15g", others by default. Then I run three experiments.
>>
>> *   val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>>
>>*val* *sc* = *new* *SparkContext*(conf)
>>
>>*val* *input* = sc.textFile(*"/InputFiles"*)
>>
>>   *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
>> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>>
>>   *val* *ITERATIONS* = *3*
>>
>>   *for* (i *<-* *1* to *ITERATIONS*) {
>>
>>   *val* *totallength* = input.filter(line*=>*line.contains(
>> *"the"*)).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>>
>>   }
>>
>> (I) The first run: no caching at all. The application finishes in ~12
>> minutes (2.6min+3.3min+3.2min+3.3min)
>>
>> (II) The second run, I modified the code so that the input will be
>> cached:
>>  *val input = sc.textFile("/InputFiles").cache()*
>>  The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>>  The storage page in Web UI shows 48% of the dataset  is cached,
>> which makes sense due to large java object overhead, and
>> spark.storage.memoryFraction is 0.6 by default.
>>
>> (III) However, the third run, same program as the second one, but I
>> changed "spark.driver.memory" to be "2g".
>>The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
>> And UI shows 6% of the data is cached.
>>
>> *From the results we can see the reduce stages finish in seconds, how
>> could that happen with only 6% cached? Can anyone explain?*
>>
>> I am new to Spark and would appreciate any help on this. Thanks!
>>
>> Jia
>>
>>
>>
>>
>
>
> --
> Jia Zhan
>
>


-- 
Jia Zhan


why the Rating(user: Int, product: Int, rating: Double)(in MLlib's ALS), the 'user' and 'product' must be Int?

2015-10-19 Thread futureage
hi, 
i am learning the MLlib's ALS, when i saw the case class Rating(user: Int,
product: Int, rating: Double), i want to know why the 'user' and 'product'
are Int, they are just id, Long or String is not ok? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-the-Rating-user-Int-product-Int-rating-Double-in-MLlib-s-ALS-the-user-and-product-must-be-Int-tp25135.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org