Re: Spark and RabbitMQ

2015-05-20 Thread Abel Rincón
Hi,


There is a RabbitMQ reciver for spark-streaming

http://search.maven.org/#artifactdetails|com.stratio.receiver|rabbitmq|0.1.0-RELEASE|jar

https://github.com/Stratio/RabbitMQ-Receiver


2015-05-12 14:49 GMT+02:00 Dmitry Goldenberg :

> Thanks, Akhil. It looks like in the second example, for Rabbit they're
> doing this: https://www.rabbitmq.com/mqtt.html.
>
> On Tue, May 12, 2015 at 7:37 AM, Akhil Das 
> wrote:
>
>> I found two examples Java version
>> ,
>> and Scala version. 
>>
>> Thanks
>> Best Regards
>>
>> On Tue, May 12, 2015 at 2:31 AM, dgoldenberg 
>> wrote:
>>
>>> Are there existing or under development versions/modules for streaming
>>> messages out of RabbitMQ with SparkStreaming, or perhaps a RabbitMQ RDD?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-RabbitMQ-tp22852.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: spark streaming doubt

2015-05-20 Thread Akhil Das
One receiver basically runs on 1 core, so if your single node is having 4
cores, there are still 3 cores left for the processing (for executors). And
yes receiver remains on the same machine unless some failure happens.

Thanks
Best Regards

On Tue, May 19, 2015 at 10:57 PM, Shushant Arora 
wrote:

> Thanks Akhil andDibyendu.
>
> Does in high level receiver based streaming executors run on receivers
> itself to have data localisation ? Or its always data is transferred to
> executor nodes and executor nodes differ in each run of job but receiver
> node remains same(same machines) throughout life of streaming application
> unless node failure happens?
>
>
>
> On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Just to add, there is a Receiver based Kafka consumer which uses Kafka
>> Low Level Consumer API.
>>
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>>
>>
>> Regards,
>> Dibyendu
>>
>> On Tue, May 19, 2015 at 9:00 PM, Akhil Das 
>> wrote:
>>
>>>
>>> On Tue, May 19, 2015 at 8:10 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 So for Kafka+spark streaming, Receiver based streaming used highlevel
 api and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?

>>>
>>> ​-> Receiver based streaming will always have the receiver running
>>> parallel while your job is running, So by default for every 200ms
>>> (spark.streaming.blockInterval) the receiver will generate a block of data
>>> which is read from Kafka.
>>> ​
>>>
>>>
 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?

>>>
>>> ​-> Not sure from where did you came up with this. For the non stream
>>> based one, i think the number of partitions in spark will be equal to the
>>> number of kafka partitions for the given topic.
>>> ​
>>>
>>>
 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​-> They will consume the data parallel.​ For the receiver based
>>> approach, you can actually specify the number of receiver that you want to
>>> spawn for consuming the messages.
>>>



 On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
 wrote:

> spark.streaming.concurrentJobs takes an integer value, not boolean.
> If you set it as 2 then 2 jobs will run parallel. Default value is 1 and
> the next job will start once it completes the current one.
>
>
>> Actually, in the current implementation of Spark Streaming and under
>> default configuration, only job is active (i.e. under execution) at any
>> point of time. So if one batch's processing takes longer than 10 seconds,
>> then then next batch's jobs will stay queued.
>> This can be changed with an experimental Spark property
>> "spark.streaming.concurrentJobs" which is by default set to 1. Its not
>> currently documented (maybe I should add it).
>> The reason it is set to 1 is that concurrent jobs can potentially
>> lead to weird sharing of resources and which can make it hard to debug 
>> the
>> whether there is sufficient resources in the system to process the 
>> ingested
>> data fast enough. With only 1 job running at a time, it is easy to see 
>> that
>> if batch processing time < batch interval, then the system will be 
>> stable.
>> Granted that this may not be the most efficient use of resources under
>> certain conditions. We definitely hope to improve this in the future.
>
>
> Copied from TD's answer written in SO
> 
> .
>
> Non-receiver based streaming for example you can say are the
> fileStream, directStream ones. You can read a bit of information from here
> https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 2:13 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Thanks Akhil.
>> When I don't  set spark.streaming.concurrentJobs to true. Will the
>> all pending jobs starts one by one after 1 jobs completes,or it does not
>> creates jobs which could not be started at its desired interval.
>>
>> And Whats the difference and usage of Receiver vs non-receiver based
>> streaming. Is there any documentation for that?
>>
>> On Tue, May 19, 2015 at 1:35 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> It will be a single job running at a time by default (you can also
>>> configure the spark.streaming.concurren

Re: Reading Binary files in Spark program

2015-05-20 Thread Akhil Das
If you can share the complete code and a sample file, may be i can try to
reproduce it on my end.

Thanks
Best Regards

On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma 
wrote:

> Problem is still there.
> Exception is not coming at the time of reading.
> Also the count of JavaPairRDD is as expected. It is when we are calling
> collect() or toArray() methods, the exception is coming.
> Something to do with Text class even though I haven't used it in the
> program.
>
> Regards
> Tapan
>
> On Tue, May 19, 2015 at 6:26 PM, Akhil Das 
> wrote:
>
>> Try something like:
>>
>> JavaPairRDD output = sc.newAPIHadoopFile(inputDir,
>>
>> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
>> IntWritable.class,
>>   Text.class, new Job().getConfiguration());
>>
>> With the type of input format that you require.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am new to Spark and learning.
>>> I am trying to read image files into spark job. This is how I am doing:
>>> Step 1. Created sequence files with FileName as Key and Binary image as
>>> value. i.e.  Text and BytesWritable.
>>> I am able to read these sequence files into Map Reduce programs.
>>>
>>> Step 2.
>>> I understand that Text and BytesWritable are Non Serializable therefore,
>>> I
>>> read the sequence file in Spark as following:
>>>
>>> SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
>>> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
>>> JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
>>> String.class, Byte.class) ;
>>> final List> tuple2s = seqFiles.collect();
>>>
>>>
>>>
>>>
>>> The moment I try to call collect() method to get the keys of sequence
>>> file,
>>> following exception has been thrown
>>>
>>> Can any one help me understanding why collect() method is failing? If I
>>> use
>>> toArray() on seqFiles object then also I am getting same call stack.
>>>
>>> Regards
>>> Tapan
>>>
>>>
>>>
>>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>> at
>>>
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>> at
>>>
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>> at
>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>> at
>>>
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>> at
>>>
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>> 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
>>> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
>>> 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
>>> retrying
>>> 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
>>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>>> 0.0, whose tasks have all completed, from pool
>>> 2015-05-19 15:15:03,739 INFO
>>> [sparkDriver-akka.actor.default-dispatcher-2]
>>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
>>> stage 0
>>> 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
>>> (Logging.scala:logInfo(59)) - Job 0 failed: collect at
>>> JavaSequenceFile.java:44, took 4.421397 s
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due
>>> to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
>>> result: org.apache.hadoop.io.Text
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>> at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>> at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>> at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)

Re: Hive on Spark VS Spark SQL

2015-05-20 Thread Sean Owen
I don't think that's quite the difference. Any SQL  engine has a query
planner and an execution engine. Both of these Spark for execution. HoS
uses Hive for query planning. Although it's not optimized for execution on
Spark per se, it's got a lot of language support and is stable/mature.
Spark SQL's query planner is less developed at this point but purpose-built
for Spark as an execution engine. Spark SQL is also how you put SQL-like
operations in a Spark program -- programmatic SQL if you will -- which
isn't what Hive or therefore HoS does. HoS is good if you're already using
Hive and need its language features and need it as it works today, and want
a faster batch execution version of it.

On Wed, May 20, 2015 at 7:18 AM, Debasish Das 
wrote:

> SparkSQL was built to improve upon Hive on Spark runtime further...
>
> On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk <
> guoqing0...@yahoo.com.hk> wrote:
>
>> Hive on Spark and SparkSQL which should be better , and what are the key
>> characteristics and the advantages and the disadvantages between ?
>>
>> --
>> guoqing0...@yahoo.com.hk
>>
>
>


Re: Mesos Spark Tasks - Lost

2015-05-20 Thread Tim Chen
Can you share your exact spark-submit command line?

And also cluster mode is not yet released yet (1.4) and doesn't support
spark-shell, so I think you're just using client mode unless you're using
latest master.

Tim

On Tue, May 19, 2015 at 8:57 AM, Panagiotis Garefalakis 
wrote:

> Hello all,
>
> I am facing a weird issue for the last couple of days running Spark on top
> of Mesos and I need your help. I am running Mesos in a private cluster and
> managed to deploy successfully  hdfs, cassandra, marathon and play but
> Spark is not working for a reason. I have tried so far:
> different java versions (1.6 and 1.7 oracle and openjdk), different
> spark-env configuration, different Spark versions (from 0.8.8 to 1.3.1),
> different HDFS versions (hadoop 5.1 and 4.6), and updating pom dependencies.
>
> More specifically while local tasks complete fine, in cluster mode all the
> tasks get lost.
> (both using spark-shell and spark-submit)
> From the worker log I see something like this:
>
> ---
> I0519 02:36:30.475064 12863 fetcher.cpp:214] Fetching URI
> 'hdfs:/:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
> I0519 02:36:30.747372 12863 fetcher.cpp:99] Fetching URI
> 'hdfs://X:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' using Hadoop
> Client
> I0519 02:36:30.747546 12863 fetcher.cpp:109] Downloading resource from
> 'hdfs://:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' to
> '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
> I0519 02:36:34.205878 12863 fetcher.cpp:78] Extracted resource
> '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
> into
> '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3'
> *Error: Could not find or load main class two*
>
> ---
>
> And from the Spark Terminal:
>
> ---
> 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
> 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled
> 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at
> SparkPi.scala:35
> 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at
> SparkPi.scala:35
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent
> failure: Lost task 7.3 in stage 0.0 (TID 26, ): ExecutorLostFailure
> (executor lost)
> Driver stacktrace: at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ..
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ---
>
> Any help will be greatly appreciated!
>
> Regards,
> Panagiotis
>


Re: Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-20 Thread Tomasz Fruboes

Hi,

 thanks for answer. The rights are

drwxr-xr-x 3 tfruboes all 5632 05-19 15:40 test19EE/

 I have tried setting the rights to 777 for this directory prior to 
execution. This does not get propagated down the chain, ie the directory 
created as a result of the "save" call (namesAndAges.parquet2 in the 
path in the dump [1] below) is created with the drwxr-xr-x rights (owned 
by the user submitting the job, ie tfruboes). The temp directories 
created inside


namesAndAges.parquet2/_temporary/0/

(e.g. task_201505200920_0009_r_01) are owned by root, again with 
drwxr-xr-x access rights


 Cheers,
  Tomasz

W dniu 19.05.2015 o 23:56, Davies Liu pisze:

It surprises me, could you list the owner information of
/mnt/lustre/bigdata/med_home/tmp/test19EE/ ?

On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes
 wrote:

Dear Experts,

  we have a spark cluster (standalone mode) in which master and workers are
started from root account. Everything runs correctly to the point when we
try doing operations such as

 dataFrame.select("name", "age").save(ofile, "parquet")

or

 rdd.saveAsPickleFile(ofile)

, where ofile is path on a network exported filesystem (visible on all
nodes, in our case this is lustre, I guess on nfs effect would be similar).

  Unsurprisingly temp files created on workers are owned by root, which then
leads to a crash (see [1] below). Is there a solution/workaround for this
(e.g. controlling file creation mode of the temporary files)?

Cheers,
  Tomasz


ps I've tried to google this problem, couple of similar reports, but no
clear answer/solution found

ps2 For completeness - running master/workers as a regular user solves the
problem only for the given user. For other users submitting to this master
the result is given in [2] below


[0] Cluster details:
Master/workers: centos 6.5
Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build)


[1]
##
File
"/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o27.save.
: java.io.IOException: Failed to rename
DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet;
isDirectory=false; length=534; replication=1; blocksize=33554432;
modification_time=1432042832000; access_time=0; owner=; group=;
permission=rw-rw-rw-; isSymlink=false} to
file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
 at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
 at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
 at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43)
 at
org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690)
 at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
 at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
 at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
 at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)
##



[2]
##
15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3,
wn23023.cis.gov.pl): java.io.IOException: Mkdirs failed to create
file:/mnt/lustre/bigdata/med_home/tmp/test18/namesAndAges.parquet2/_temporary/0/_temporary/attempt_201505191445_0009_r_00_0
 at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
 at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
 at org.apache.hadoop.fs.FileSystem.crea

Is this a good use case for Spark?

2015-05-20 Thread jakeheller
Hi all, I'm new to Spark -- so new that we're deciding whether to use it in
the first place, and I was hoping someone here could help me figure that
out. 

We're doing a lot of processing of legal documents -- in particular, the
entire corpus of American law. It's about 10m documents, many of which are
quite large as far as text goes (100s of pages). 

We'd like to 
(a) transform these documents from the various (often borked) formats they
come to us in into a standard XML format, 
(b) when it is in a standard format, extract information from them (e.g.,
which judicial cases cite each other?) and annotate the documents with the
information extracted, and then 
(c) deliver the end result to a repository (like s3) where it can be
accessed by the user-facing application.

Of course, we'd also like to do all of this quickly -- optimally, running
the entire database through the whole pipeline in a few hours.

We currently use a mix of Python and Java scripts (including XSLT, and
NLP/unstructured data tools like UIMA and Stanford's CoreNLP) in various
places along the pipeline we built for ourselves to handle these tasks. The
current pipeline infrastructure was built a while back -- it's basically a
number of HTTP servers that each have a single task and pass the document
along from server to server as it goes through the processing pipeline. It's
great although it's having trouble scaling, and there are some reliability
issues. It's also a headache to handle all the infrastructure. For what it's
worth, metadata about the documents resides in SQL, and the actual text of
the documents lives in s3. 

It seems like Spark would be ideal for this, but after some searching I
wasn't able to find too many examples of people using it for
document-processing tasks (like transforming documents from one XML format
into another) and I'm not clear if I can chain those sorts of tasks and NLP
tasks, especially if some happen in Python and others in Java. Finally, I
don't know if the size of the data (i.e., we'll likely want to run
operations on whole documents, rather than just lines) imposes
issues/constraints. 

Thanks all!
Jake



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-this-a-good-use-case-for-Spark-tp22954.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.3.1 jars in repo1.maven.org

2015-05-20 Thread Sean Owen
Yes, the published artifacts can only refer to one version of anything
(OK, modulo publishing a large number of variants under classifiers).

You aren't intended to rely on Spark's transitive dependencies for
anything. Compiling against the Spark API has no relation to what
version of Hadoop it binds against because it's not part of any API.
You mark the Spark dependency even as "provided" in your build and get
all the Spark/Hadoop bindings at runtime from our cluster.

What problem are you experiencing?

On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson  wrote:
> Hi,
> I'd like to confirm an observation I've just made. Specifically that spark
> is only available in repo1.maven.org for one Hadoop variant.
>
> The Spark source can be compiled against a number of different Hadoops using
> profiles. Yay.
> However, the spark jars in repo1.maven.org appear to be compiled against one
> specific Hadoop and no other differentiation is made. (I can see a
> difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
> the version I compiled locally).
>
> The implication here is that if you have a pom file asking for
> spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
> version. Maven assumes that non-snapshot artifacts never change so trying to
> load an Hadoop 1 version will end in tears.
>
> This then means that if you compile code against spark-core then there will
> probably be classpath NoClassDefFound issues unless the Hadoop 2 version is
> exactly the one you want.
>
> Have I gotten this correct?
>
> It happens that our little app is using a Spark context directly from a
> Jetty webapp and the classpath differences were/are causing some confusion.
> We are currently installing a Hadoop 1 spark master and worker.
>
> Thanks a lot!
> Edward

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming doubt

2015-05-20 Thread Shushant Arora
So I can explicitly specify no of receivers and executors in receiver based
streaming? Can you share a sample program if any?

Also in Low level non receiver based , will data be fetched by same worker
executor node and processed ? Also if I have concurrent jobs set to 1- so
in low level
fetching and processing will be delayed till next job starts ,say a
situation where I have 1 sec of stream interval but my job1 takes 5 sec to
complete , hence job2 starts at end of 5 sec, so now will it process all
data from sec1 to sec 5 in low level non receiver streaming or only for
interval sec1-sec2 ?

And if it processes data for complete duration sec1-sec5.Is there any
option to suppress start of other queued jobs(for interval sec2-3,
sec3-4,sec4-5) since there work is already done by job2 ?


On Wed, May 20, 2015 at 12:36 PM, Akhil Das 
wrote:

> One receiver basically runs on 1 core, so if your single node is having 4
> cores, there are still 3 cores left for the processing (for executors). And
> yes receiver remains on the same machine unless some failure happens.
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 10:57 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Thanks Akhil andDibyendu.
>>
>> Does in high level receiver based streaming executors run on receivers
>> itself to have data localisation ? Or its always data is transferred to
>> executor nodes and executor nodes differ in each run of job but receiver
>> node remains same(same machines) throughout life of streaming application
>> unless node failure happens?
>>
>>
>>
>> On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Just to add, there is a Receiver based Kafka consumer which uses Kafka
>>> Low Level Consumer API.
>>>
>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>>>
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Tue, May 19, 2015 at 9:00 PM, Akhil Das 
>>> wrote:
>>>

 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> So for Kafka+spark streaming, Receiver based streaming used highlevel
> api and non receiver based streaming used low level api.
>
> 1.In high level receiver based streaming does it registers consumers
> at each job start(whenever a new job is launched by streaming application
> say at each second)?
>

 ​-> Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


> 2.No of executors in highlevel receiver based jobs will always equal
> to no of partitions in topic ?
>

 ​-> Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


> 3.Will data from a single topic be consumed by executors in parllel or
> only one receiver consumes in multiple threads and assign to executors in
> high level receiver based approach ?
>
> ​-> They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.

>
>
>
> On Tue, May 19, 2015 at 2:38 PM, Akhil Das  > wrote:
>
>> spark.streaming.concurrentJobs takes an integer value, not boolean.
>> If you set it as 2 then 2 jobs will run parallel. Default value is 1 and
>> the next job will start once it completes the current one.
>>
>>
>>> Actually, in the current implementation of Spark Streaming and under
>>> default configuration, only job is active (i.e. under execution) at any
>>> point of time. So if one batch's processing takes longer than 10 
>>> seconds,
>>> then then next batch's jobs will stay queued.
>>> This can be changed with an experimental Spark property
>>> "spark.streaming.concurrentJobs" which is by default set to 1. Its not
>>> currently documented (maybe I should add it).
>>> The reason it is set to 1 is that concurrent jobs can potentially
>>> lead to weird sharing of resources and which can make it hard to debug 
>>> the
>>> whether there is sufficient resources in the system to process the 
>>> ingested
>>> data fast enough. With only 1 job running at a time, it is easy to see 
>>> that
>>> if batch processing time < batch interval, then the system will be 
>>> stable.
>>> Granted that this may not be the most efficient use of resources under
>>> certain conditions. We definitely hope to improve this in the future.
>>
>>
>> Copied from TD's answer written in SO
>> 

Re: spark streaming doubt

2015-05-20 Thread Akhil Das
On Wed, May 20, 2015 at 1:12 PM, Shushant Arora 
wrote:

> So I can explicitly specify no of receivers and executors in receiver
> based streaming? Can you share a sample program if any?
>

​
​
-You can look at the lowlevel consumer repo
 shared by Dibyendu for
sample code.​​

> ​
> ​
>
Also in Low level non receiver based , will data be fetched by same worker
> executor node and processed ? Also if I have concurrent jobs set to 1- so
> in low level
> fetching and processing will be delayed till next job starts ,say a
> situation where I have 1 sec of stream interval but my job1 takes 5 sec to
> complete , hence job2 starts at end of 5 sec, so now will it process all
> data from sec1 to sec 5 in low level non receiver streaming or only for
> interval sec1-sec2 ?​
>

> And if it processes data for complete duration sec1-sec5.Is there any
> option to suppress start of other queued jobs(for interval sec2-3,
> sec3-4,sec4-5) since there work is already done by job2 ?
>

​
​
​- I believe all your data from sec2-sec5 will be available in Kafka and
when the second batch starts at 5 sec​

​it will consumer it (you can also limit the rate with
spark.streaming.kafka.maxRatePerPartition)​

Read more here
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md​


>
>
> On Wed, May 20, 2015 at 12:36 PM, Akhil Das 
> wrote:
>
>> One receiver basically runs on 1 core, so if your single node is having 4
>> cores, there are still 3 cores left for the processing (for executors). And
>> yes receiver remains on the same machine unless some failure happens.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, May 19, 2015 at 10:57 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks Akhil andDibyendu.
>>>
>>> Does in high level receiver based streaming executors run on receivers
>>> itself to have data localisation ? Or its always data is transferred to
>>> executor nodes and executor nodes differ in each run of job but receiver
>>> node remains same(same machines) throughout life of streaming application
>>> unless node failure happens?
>>>
>>>
>>>
>>> On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Just to add, there is a Receiver based Kafka consumer which uses Kafka
 Low Level Consumer API.

 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


 Regards,
 Dibyendu

 On Tue, May 19, 2015 at 9:00 PM, Akhil Das 
 wrote:

>
> On Tue, May 19, 2015 at 8:10 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> So for Kafka+spark streaming, Receiver based streaming used highlevel
>> api and non receiver based streaming used low level api.
>>
>> 1.In high level receiver based streaming does it registers consumers
>> at each job start(whenever a new job is launched by streaming application
>> say at each second)?
>>
>
> ​-> Receiver based streaming will always have the receiver running
> parallel while your job is running, So by default for every 200ms
> (spark.streaming.blockInterval) the receiver will generate a block of data
> which is read from Kafka.
> ​
>
>
>> 2.No of executors in highlevel receiver based jobs will always equal
>> to no of partitions in topic ?
>>
>
> ​-> Not sure from where did you came up with this. For the non stream
> based one, i think the number of partitions in spark will be equal to the
> number of kafka partitions for the given topic.
> ​
>
>
>> 3.Will data from a single topic be consumed by executors in parllel
>> or only one receiver consumes in multiple threads and assign to executors
>> in high level receiver based approach ?
>>
>> ​-> They will consume the data parallel.​ For the receiver based
> approach, you can actually specify the number of receiver that you want to
> spawn for consuming the messages.
>
>>
>>
>>
>> On Tue, May 19, 2015 at 2:38 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> spark.streaming.concurrentJobs takes an integer value, not boolean.
>>> If you set it as 2 then 2 jobs will run parallel. Default value is 1 and
>>> the next job will start once it completes the current one.
>>>
>>>
 Actually, in the current implementation of Spark Streaming and
 under default configuration, only job is active (i.e. under execution) 
 at
 any point of time. So if one batch's processing takes longer than 10
 seconds, then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 "spark.streaming.concurrentJobs" which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially
 lead to weir

Re: Hive on Spark VS Spark SQL

2015-05-20 Thread ayan guha
And if I am not wrong, spark SQL api is intended to move closer to SQL
standards. I feel its a clever decision on spark's part to keep both APIs
operational. These short term confusions worth the long term benefits.
On 20 May 2015 17:19, "Sean Owen"  wrote:

> I don't think that's quite the difference. Any SQL  engine has a query
> planner and an execution engine. Both of these Spark for execution. HoS
> uses Hive for query planning. Although it's not optimized for execution on
> Spark per se, it's got a lot of language support and is stable/mature.
> Spark SQL's query planner is less developed at this point but purpose-built
> for Spark as an execution engine. Spark SQL is also how you put SQL-like
> operations in a Spark program -- programmatic SQL if you will -- which
> isn't what Hive or therefore HoS does. HoS is good if you're already using
> Hive and need its language features and need it as it works today, and want
> a faster batch execution version of it.
>
> On Wed, May 20, 2015 at 7:18 AM, Debasish Das 
> wrote:
>
>> SparkSQL was built to improve upon Hive on Spark runtime further...
>>
>> On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk <
>> guoqing0...@yahoo.com.hk> wrote:
>>
>>> Hive on Spark and SparkSQL which should be better , and what are the key
>>> characteristics and the advantages and the disadvantages between ?
>>>
>>> --
>>> guoqing0...@yahoo.com.hk
>>>
>>
>>
>


Re: Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-20 Thread Iulian Dragoș
You could try setting `SPARK_USER` to the user under which your workers are
running. I couldn't find many references to this variable, but at least
Yarn and Mesos take it into account when spawning executors. Chances are
that standalone mode also does it.

iulian

On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes 
wrote:

> Hi,
>
>  thanks for answer. The rights are
>
> drwxr-xr-x 3 tfruboes all 5632 05-19 15:40 test19EE/
>
>  I have tried setting the rights to 777 for this directory prior to
> execution. This does not get propagated down the chain, ie the directory
> created as a result of the "save" call (namesAndAges.parquet2 in the path
> in the dump [1] below) is created with the drwxr-xr-x rights (owned by the
> user submitting the job, ie tfruboes). The temp directories created inside
>
> namesAndAges.parquet2/_temporary/0/
>
> (e.g. task_201505200920_0009_r_01) are owned by root, again with
> drwxr-xr-x access rights
>
>  Cheers,
>   Tomasz
>
> W dniu 19.05.2015 o 23:56, Davies Liu pisze:
>
>  It surprises me, could you list the owner information of
>> /mnt/lustre/bigdata/med_home/tmp/test19EE/ ?
>>
>> On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes
>>  wrote:
>>
>>> Dear Experts,
>>>
>>>   we have a spark cluster (standalone mode) in which master and workers
>>> are
>>> started from root account. Everything runs correctly to the point when we
>>> try doing operations such as
>>>
>>>  dataFrame.select("name", "age").save(ofile, "parquet")
>>>
>>> or
>>>
>>>  rdd.saveAsPickleFile(ofile)
>>>
>>> , where ofile is path on a network exported filesystem (visible on all
>>> nodes, in our case this is lustre, I guess on nfs effect would be
>>> similar).
>>>
>>>   Unsurprisingly temp files created on workers are owned by root, which
>>> then
>>> leads to a crash (see [1] below). Is there a solution/workaround for this
>>> (e.g. controlling file creation mode of the temporary files)?
>>>
>>> Cheers,
>>>   Tomasz
>>>
>>>
>>> ps I've tried to google this problem, couple of similar reports, but no
>>> clear answer/solution found
>>>
>>> ps2 For completeness - running master/workers as a regular user solves
>>> the
>>> problem only for the given user. For other users submitting to this
>>> master
>>> the result is given in [2] below
>>>
>>>
>>> [0] Cluster details:
>>> Master/workers: centos 6.5
>>> Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build)
>>>
>>>
>>> [1]
>>> ##
>>> File
>>>
>>> "/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>> line 300, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o27.save.
>>> : java.io.IOException: Failed to rename
>>>
>>> DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet;
>>> isDirectory=false; length=534; replication=1; blocksize=33554432;
>>> modification_time=1432042832000; access_time=0; owner=; group=;
>>> permission=rw-rw-rw-; isSymlink=false} to
>>>
>>> file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
>>>  at
>>>
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
>>>  at
>>>
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
>>>  at
>>>
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>>>  at
>>>
>>> parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43)
>>>  at
>>>
>>> org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690)
>>>  at
>>>
>>> org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
>>>  at
>>> org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
>>>  at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
>>>  at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181)
>>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>  at
>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>  at
>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>  at java.lang.reflect.Method.invoke(Method.java:606)
>>>  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>  at
>>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>>>  at py4j.Gateway.invoke(Gateway.java:259)
>>>  at
>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>  at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>>  at java.lang.Th

Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-20 Thread Stephen Boesch
What conditions would cause the following delays / failure for a standalone
machine/cluster to have the Worker contact the Master?

15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at
http://10.0.0.3:8081
15/05/20 02:02:53 INFO Worker: Connecting to master
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated
for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077
15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1)
..
..
15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3)
15/05/20 02:03:26 INFO Worker: Connecting to master
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated
for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077


saveasorcfile on partitioned orc

2015-05-20 Thread patcharee

Hi,

I followed the information on 
https://www.mail-archive.com/reviews@spark.apache.org/msg141113.html to 
save orc file with spark 1.2.1.


I can save data to a new orc file. I wonder how to save data to an 
existing and partitioned orc file? Any suggestions?


BR,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-20 Thread Evo Eftimov
Check whether the name can be resolved in the /etc/hosts file (or DNS) of the 
worker 

 

(the same btw applies for the Node where you run the driver app – all other 
nodes must be able to resolve its name)

 

From: Stephen Boesch [mailto:java...@gmail.com] 
Sent: Wednesday, May 20, 2015 10:07 AM
To: user
Subject: Intermittent difficulties for Worker to contact Master on same machine 
in standalone

 

 

What conditions would cause the following delays / failure for a standalone 
machine/cluster to have the Worker contact the Master?

 

15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at http://10.0.0.3:8081

15/05/20 02:02:53 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...

15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote 
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 
5000 ms, all messages to this address will be delivered to dead letters. 
Reason: Connection refused: mellyrn.local/10.0.0.3:7077

15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1)

..

..

15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3)

15/05/20 02:03:26 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...

15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote 
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 
5000 ms, all messages to this address will be delivered to dead letters. 
Reason: Connection refused: mellyrn.local/10.0.0.3:7077



Re: Code error

2015-05-20 Thread Romain Sagean
Hi Ricardo,
instead of filtering header just remove the header of your file.

In your code you create a filter for the header but you don't use it to
compute parsedData.

val parsedData = filter_data.map(s => Vectors.dense(s.split(',').
map(_.toDouble))).cache()

2015-05-19 21:23 GMT+02:00 Stephen Boesch :

> Hi Ricardo,
>  providing the error output would help . But in any case you need to do a
> collect() on the rdd returned from computeCost.
>
> 2015-05-19 11:59 GMT-07:00 Ricardo Goncalves da Silva <
> ricardog.si...@telefonica.com>:
>
>  Hi,
>>
>>
>>
>> Can anybody see what’s wrong in this piece of code:
>>
>>
>>
>>
>>
>> ./bin/spark-shell --num-executors 2 --executor-memory 512m --master
>> yarn-client
>>
>> import org.apache.spark.mllib.clustering.KMeans
>>
>> import org.apache.spark.mllib.linalg.Vectors
>>
>>
>>
>>
>>
>> val data = sc.textFile("/user/p_loadbd/fraude5.csv").map(x =>
>> x.toLowerCase.split(',')).map(x => x(0)+","+x(1))
>>
>> val header = data.first()
>>
>> val filter_data = data.filter(x => x != header)
>>
>> val parsedData = data.map(s =>
>> Vectors.dense(s.split(',').map(_.toDouble))).cache()
>>
>>
>>
>> val numClusters = 2
>>
>> val numIterations = 20
>>
>> val clusters = KMeans.train(parsedData, numClusters, numIterations)
>>
>>
>>
>> val WSSSE = clusters.computeCost(parsedData)
>>
>> println("Within Set Sum of Squared Errors = " + WSSSE)
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>> [image: Descrição: Descrição: Descrição:
>> cid:image002.jpg@01CC89A8.2B628650]
>>
>> *Ricardo Goncalves da Silva*
>> Lead Data Scientist *|* Seção de Desenvolvimento de Sistemas de
>>
>> Business Intelligence – Projetos de Inovação *| *IDPB02
>>
>> Av. Eng. Luis Carlos Berrini, 1.376 – 7º – 04571-000 - SP
>>
>> ricardog.si...@telefonica.com *|* www.telefonica.com.br
>>
>> Tel +55 11 3430 4955 *| *Cel +55 11 94292 9526
>>
>>
>>
>>
>>
>> --
>>
>> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario,
>> puede contener información privilegiada o confidencial y es para uso
>> exclusivo de la persona o entidad de destino. Si no es usted. el
>> destinatario indicado, queda notificado de que la lectura, utilización,
>> divulgación y/o copia sin autorización puede estar prohibida en virtud de
>> la legislación vigente. Si ha recibido este mensaje por error, le rogamos
>> que nos lo comunique inmediatamente por esta misma vía y proceda a su
>> destrucción.
>>
>> The information contained in this transmission is privileged and
>> confidential information intended only for the use of the individual or
>> entity named above. If the reader of this message is not the intended
>> recipient, you are hereby notified that any dissemination, distribution or
>> copying of this communication is strictly prohibited. If you have received
>> this transmission in error, do not read it. Please immediately reply to the
>> sender that you have received this communication in error and then delete
>> it.
>>
>> Esta mensagem e seus anexos se dirigem exclusivamente ao seu
>> destinatário, pode conter informação privilegiada ou confidencial e é para
>> uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o
>> destinatário indicado, fica notificado de que a leitura, utilização,
>> divulgação e/ou cópia sem autorização pode estar proibida em virtude da
>> legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos
>> o comunique imediatamente por esta mesma via e proceda a sua destruição
>>
>
>


-- 
Romain Sagean


How to set HBaseConfiguration in Spark

2015-05-20 Thread donhoff_h
Hi, all

I wrote a program to get HBaseConfiguration object in Spark. But after I 
printed the content of this hbase-conf object, I found they were wrong. For 
example, the property "hbase.zookeeper.quorum" should be 
"bgdt01.dev.hrb,bgdt02.dev.hrb,bgdt03.hrb". But the printed value is 
"localhost".

Could anybody tell me how to set up the HBase Configuration in Spark? No matter 
it should be set in a configuration file or be set by a Spark API.  Many Thanks!

The code of my program is listed below:
object TestHBaseConf {
 def main(args: Array[String]) {
   val conf = new SparkConf()
   val sc = new SparkContext(conf)
   val hbConf = HBaseConfiguration.create()
   hbConf.addResource("""file:///etc/hbase/conf/hbase-site.xml""")
   val it = hbConf.iterator()
   while(it.hasNext) {
 val e = it.next()
 println("Key="+ e.getKey +" Value="+e.getValue)
   }

   val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
   val result = rdd.sum()
   println("result="+result)
   sc.stop()
 }
}

Re: Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-20 Thread Tomasz Fruboes
Thanks for a suggestion. I have tried playing with it, sc.sparkUser() 
gives me expected user name, but it doesnt solve the problem. From a 
quick search through the spark code it seems to me, that this setting is 
effective only for yarn and mesos.


 I think the workaround for the problem could be using "--deploy-mode 
cluster" (not 100% convenient, since disallows any interactive work), 
but this is not supported for python based programs.


Cheers,
  Tomasz



W dniu 20.05.2015 o 10:57, Iulian Dragoș pisze:

You could try setting `SPARK_USER` to the user under which your workers
are running. I couldn't find many references to this variable, but at
least Yarn and Mesos take it into account when spawning executors.
Chances are that standalone mode also does it.

iulian

On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes
mailto:tomasz.frub...@fuw.edu.pl>> wrote:

Hi,

  thanks for answer. The rights are

drwxr-xr-x 3 tfruboes all 5632 05-19 15 :40
test19EE/

  I have tried setting the rights to 777 for this directory prior to
execution. This does not get propagated down the chain, ie the
directory created as a result of the "save" call
(namesAndAges.parquet2 in the path in the dump [1] below) is created
with the drwxr-xr-x rights (owned by the user submitting the job, ie
tfruboes). The temp directories created inside

namesAndAges.parquet2/_temporary/0/

(e.g. task_201505200920_0009_r_01) are owned by root, again with
drwxr-xr-x access rights

  Cheers,
   Tomasz

W dniu 19.05.2015 o 23:56, Davies Liu pisze:

It surprises me, could you list the owner information of
/mnt/lustre/bigdata/med_home/tmp/test19EE/ ?

On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes
mailto:tomasz.frub...@fuw.edu.pl>>
wrote:

Dear Experts,

   we have a spark cluster (standalone mode) in which master
and workers are
started from root account. Everything runs correctly to the
point when we
try doing operations such as

  dataFrame.select("name", "age").save(ofile, "parquet")

or

  rdd.saveAsPickleFile(ofile)

, where ofile is path on a network exported filesystem
(visible on all
nodes, in our case this is lustre, I guess on nfs effect
would be similar).

   Unsurprisingly temp files created on workers are owned by
root, which then
leads to a crash (see [1] below). Is there a
solution/workaround for this
(e.g. controlling file creation mode of the temporary files)?

Cheers,
   Tomasz


ps I've tried to google this problem, couple of similar
reports, but no
clear answer/solution found

ps2 For completeness - running master/workers as a regular
user solves the
problem only for the given user. For other users submitting
to this master
the result is given in [2] below


[0] Cluster details:
Master/workers: centos 6.5
Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the
2.6 build)


[1]
##
 File

"/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o27.save.
: java.io.IOException: Failed to rename

DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet;
isDirectory=false; length=534; replication=1;
blocksize=33554432;
modification_time=1432042832000; access_time=0; owner=; group=;
permission=rw-rw-rw-; isSymlink=false} to

file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
  at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
  at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
  at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
  at

parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43)
  at

org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690)
  at

org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.s

LATERAL VIEW explode issue

2015-05-20 Thread kiran mavatoor
Hi,
When I use "LATERAL VIEW explode" on the registered temp table in spark shell, 
it works.  But when I use the same in spark-submit (as jar file) it is not 
working. its giving error -  "failure: ``union'' expected but identifier VIEW 
found"
sql statement i am using is
SELECT id,mapKey FROM locations LATERAL VIEW 
explode(map_keys(jsonStringToMapUdf(countries))) countries AS mapKey
I registered "jsonStringToMapUdf" as my sql function.
ThanksKiran9008099770  

Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Hemant Bhanawat
Hi,

I have compiled a list (from online sources) of knobs/design considerations
that need to be taken care of by applications running on spark streaming.
Is my understanding correct?  Any other important design consideration that
I should take care of?


   - A DStream is associated with a single receiver. For attaining read
   parallelism multiple receivers i.e. multiple DStreams need to be created.
   - A receiver is run within an executor. It occupies one core. Ensure
   that there are enough cores for processing after receiver slots are booked
   i.e. spark.cores.max should take the receiver slots into account.
   - The receivers are allocated to executors in a round robin fashion.
   - When data is received from a stream source, receiver creates blocks of
   data.  A new block of data is generated every blockInterval milliseconds. N
   blocks of data are created during the batchInterval where N =
   batchInterval/blockInterval.
   - These blocks are distributed by the BlockManager of the current
   executor to the block managers of other executors. After that, the Network
   Input Tracker running on the driver is informed about the block locations
   for further processing.
   - A RDD is created on the driver for the blocks created during the
   batchInterval. The blocks generated during the batchInterval are partitions
   of the RDD. Each partition is a task in spark. blockInterval==
   batchinterval would mean that a single partition is created and probably it
   is processed locally.
   - Having bigger blockinterval means bigger blocks. A high value of
   spark.locality.wait increases the chance of processing a block on the local
   node. A balance needs to be found out between these two parameters to
   ensure that the bigger blocks are processed locally.
   - Instead of relying on batchInterval and blockInterval, you can define
   the number of partitions by calling dstream.repartition(n). This reshuffles
   the data in RDD randomly to create n number of partitions.
   - An RDD's processing is scheduled by driver's jobscheduler as a job. At
   a given point of time only one job is active. So, if one job is executing
   the other jobs are queued.
   - If you have two dstreams there will be two RDDs formed and there will
   be two jobs created which will be scheduled one after the another.
   - To avoid this, you can union two dstreams. This will ensure that a
   single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
   is then considered as a single job. However the partitioning of the RDDs is
   not impacted.
   - If the batch processing time is more than batchinterval then obviously
   the receiver's memory will start filling up and will end up in throwing
   exceptions (most probably BlockNotFoundException). Currently there is  no
   way to pause the receiver.
   - For being fully fault tolerant, spark streaming needs to enable
   checkpointing. Checkpointing increases the batch processing time.
   - The frequency of metadata checkpoint cleaning can be controlled using
   spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when
   the RDDs in the checkpoint are no more required.



Thanks,
Hemant


PySpark Logs location

2015-05-20 Thread Oleg Ruchovets
Hi ,

  I am executing PySpark job on yarn ( hortonworks distribution).

Could someone pointing me where is the log locations?

Thanks
Oleg.


Re: Reading Binary files in Spark program

2015-05-20 Thread Tapan Sharma
I am not doing anything special.


*Here is the code :*


SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
String.class, Byte.class) ;

// Following statements is giving exception

final List> tuple2s = seqFiles.toArray();

// Or

final List> tuple2s = seqFiles.collect();


*And this is how I have created a sequence file:*

http://stuartsierra.com/2008/04/24/a-million-little-files


Regards

Tapan



On Wed, May 20, 2015 at 12:42 PM, Akhil Das 
wrote:

> If you can share the complete code and a sample file, may be i can try to
> reproduce it on my end.
>
> Thanks
> Best Regards
>
> On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma 
> wrote:
>
>> Problem is still there.
>> Exception is not coming at the time of reading.
>> Also the count of JavaPairRDD is as expected. It is when we are calling
>> collect() or toArray() methods, the exception is coming.
>> Something to do with Text class even though I haven't used it in the
>> program.
>>
>> Regards
>> Tapan
>>
>> On Tue, May 19, 2015 at 6:26 PM, Akhil Das 
>> wrote:
>>
>>> Try something like:
>>>
>>> JavaPairRDD output = sc.newAPIHadoopFile(inputDir,
>>>
>>> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
>>> IntWritable.class,
>>>   Text.class, new Job().getConfiguration());
>>>
>>> With the type of input format that you require.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma 
>>> wrote:
>>>
 Hi Team,

 I am new to Spark and learning.
 I am trying to read image files into spark job. This is how I am doing:
 Step 1. Created sequence files with FileName as Key and Binary image as
 value. i.e.  Text and BytesWritable.
 I am able to read these sequence files into Map Reduce programs.

 Step 2.
 I understand that Text and BytesWritable are Non Serializable
 therefore, I
 read the sequence file in Spark as following:

 SparkConf sparkConf = new
 SparkConf().setAppName("JavaSequenceFile");
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
 String.class, Byte.class) ;
 final List> tuple2s = seqFiles.collect();




 The moment I try to call collect() method to get the keys of sequence
 file,
 following exception has been thrown

 Can any one help me understanding why collect() method is failing? If I
 use
 toArray() on seqFiles object then also I am getting same call stack.

 Regards
 Tapan



 java.io.NotSerializableException: org.apache.hadoop.io.Text
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
 scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in
 stage
 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text;
 not
 retrying
 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed
 TaskSet
 0.0, whose tasks have all completed, from pool
 2015-05-19 15:15:03,739 INFO
 [sparkDriver-akka.actor.default-dispatcher-2]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
 stage 0
 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Job 0 failed: collect at
 JavaSequenceFile.java:44, took 4.421397 s
 Exception in thread "main" org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 0.0 in stage 0.0 (TID 0) had a

Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-20 Thread MEETHU MATHEW
Hi Davies,Thank you for pointing to spark streaming. I am confused about how to 
return the result after running a function via  a thread.I tried using Queue to 
add the results to it and print it at the end.But here, I can see the results 
after all threads are finished.How to get the result of the function once a 
thread is finished, rather than waiting for all other threads to finish? Thanks 
& Regards,
Meethu M 


 On Tuesday, 19 May 2015 2:43 AM, Davies Liu  wrote:
   

 SparkContext can be used in multiple threads (Spark streaming works
with multiple threads), for example:

import threading
import time

def show(x):
    time.sleep(1)
    print x

def job():
    sc.parallelize(range(100)).foreach(show)

threading.Thread(target=job).start()


On Mon, May 18, 2015 at 12:34 AM, ayan guha  wrote:
> Hi
>
> So to be clear, do you want to run one operation in multiple threads within
> a function or you want run multiple jobs using multiple threads? I am
> wondering why python thread module can't be used? Or you have already gave
> it a try?
>
> On 18 May 2015 16:39, "MEETHU MATHEW"  wrote:
>>
>> Hi Akhil,
>>
>> The python wrapper for Spark Job Server did not help me. I actually need
>> the pyspark code sample  which shows how  I can call a function from 2
>> threads and execute it simultaneously.
>>
>> Thanks & Regards,
>> Meethu M
>>
>>
>>
>> On Thursday, 14 May 2015 12:38 PM, Akhil Das 
>> wrote:
>>
>>
>> Did you happened to have a look at the spark job server? Someone wrote a
>> python wrapper around it, give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW 
>> wrote:
>>
>> Hi all,
>>
>>  Quote
>>  "Inside a given Spark application (SparkContext instance), multiple
>> parallel jobs can run simultaneously if they were submitted from separate
>> threads. "
>>
>> How to run multiple jobs in one SPARKCONTEXT using separate threads in
>> pyspark? I found some examples in scala and java, but couldn't find python
>> code. Can anyone help me with a pyspark example?
>>
>> Thanks & Regards,
>> Meethu M
>>
>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Initial job has not accepted any resources

2015-05-20 Thread podioss
Hi,
i am running spark jobs with standalone resource manager and i am gathering
several performance metrics from my cluster nodes. I am also gathering disk
io metrics from my nodes and because many of my jobs are using the same
dataset i am trying to prevent the operating system from caching the dataset
in memory in every node so as to gather the correct metrics for every job.
Therefore before i submit my jobs to spark i clear my caches with the
commands:
sync ; echo 3 >/proc/sys/vm/drop_caches

The problem is that when i do so i see this error at the beginning of the
job:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory

Ultimately the job runs successfully in most cases, but i feel like this
error has a significant effect in the overall execution time of the job
which i try to avoid.
I am also pretty confident that there is nothing wrong in my configurations,
because when i run jobs without clearing my nodes' caches the above error
doesn't come up.
I would really appreciate i anyone could help me with this error.

Thanks.   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp22955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: LATERAL VIEW explode issue

2015-05-20 Thread yana
Just a guess but are you using HiveContext in one case vs SqlContext inanother? 
You dont show a stacktrace but this looks like parser error...Which would make 
me guess different  context or different spark versio on the cluster you are 
submitting to...


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message From: kiran mavatoor 
 Date:05/20/2015  5:57 AM  
(GMT-05:00) To: User  Subject: 
LATERAL VIEW explode issue 
Hi,

When I use "LATERAL VIEW explode" on the registered temp table in spark shell, 
it works.  But when I use the same in spark-submit (as jar file) it is not 
working. its giving error -  "failure: ``union'' expected but identifier VIEW 
found"

sql statement i am using is

SELECT id,mapKey FROM locations LATERAL VIEW 
explode(map_keys(jsonStringToMapUdf(countries))) countries AS mapKey

I registered "jsonStringToMapUdf" as my sql function.

Thanks
Kiran
9008099770 
 

java program got Stuck at broadcasting

2015-05-20 Thread allanjie
The variable I need to broadcast is just 468 MB. 
  
  
When broadcasting, it just “stop” at here: 


*15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id 
15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process
: 1 
15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at
Test1.java:90 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
Test1.java:90) with 4 output partitions (allowLocal=false) 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage
0(saveAsTextFile at Test1.java:90) 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage:
List() 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List() 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no
missing parents 
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called
with curMem=988453294, maxMem=2061647216 
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 126.2 KB, free 1023.4 MB) 
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called
with curMem=988582558, maxMem=2061647216 
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
as bytes in memory (estimated size 76.4 KB, free 1023.3 MB) 
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB) 
15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block
broadcast_2_piece0 
15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from
broadcast at DAGScheduler.scala:839 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90) 
15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
4 tasks 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB) 
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB)   
……. 
15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in
memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB) 
*

And didn’t go forward as I still waiting, basically not stop, but more like
stuck. 

I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. 
After a few mins pass, the program stopped and showed something like this: 


15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, HadoopV26Slave3):
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 6 datanode(s) running and no node(s) are excluded in this operation. 
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549)
 
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200)
 
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
 
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
 
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
 
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) 
at org.apache.hadoop.ip

Re: save column values of DataFrame to text file

2015-05-20 Thread allanjie
Sorry, bt how does that work?
Can u specify the detail about the problem?

On 20 May 2015 at 21:32, oubrik [via Apache Spark User List] <
ml-node+s1001560n2295...@n3.nabble.com> wrote:

> hi,
> try like thiis
>
> DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
> df.select("year", "model").save("newcars.csv",
> "com.databricks.spark.csv");
>
> for more information: https://github.com/databricks/spark-csv
>
> Regards
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/save-column-values-of-DataFrame-to-text-file-tp22718p22957.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>



-- 
PhD student,
Social Media Laboratory ,
Department of Electronic & Computer Engineering
,
The Hong Kong University of Science and Technology .
Website: http://www.allanjie.net




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-column-values-of-DataFrame-to-text-file-tp22718p22958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Reading Binary files in Spark program

2015-05-20 Thread Akhil Das
Hi

Basically, you need to convert it to a serializable format before doing the
collect.

You can fire up a spark shell and paste this:

val sFile = sc.sequenceFile[LongWritable,
> Text]("/home/akhld/sequence/sigmoid")
>   *.map(_._2.toString)*
> sFile.take(5).foreach(println)


Use the attached sequence file generator and generated sequence file that i
used for testing.

Also note:If you don't do the .map to convert to string, then it will end
up with the serializable Exception that you are hitting.

[image: Inline image 1]

Thanks
Best Regards

On Wed, May 20, 2015 at 5:48 PM, Tapan Sharma 
wrote:

> I am not doing anything special.
>
>
> *Here is the code :*
>
>
> SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
> JavaPairRDD seqFiles = ctx.sequenceFile(args[0], String.class, 
> Byte.class) ;
>
> // Following statements is giving exception
>
> final List> tuple2s = seqFiles.toArray();
>
> // Or
>
> final List> tuple2s = seqFiles.collect();
>
>
> *And this is how I have created a sequence file:*
>
> http://stuartsierra.com/2008/04/24/a-million-little-files
>
>
> Regards
>
> Tapan
>
>
>
> On Wed, May 20, 2015 at 12:42 PM, Akhil Das 
> wrote:
>
>> If you can share the complete code and a sample file, may be i can try to
>> reproduce it on my end.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma 
>> wrote:
>>
>>> Problem is still there.
>>> Exception is not coming at the time of reading.
>>> Also the count of JavaPairRDD is as expected. It is when we are calling
>>> collect() or toArray() methods, the exception is coming.
>>> Something to do with Text class even though I haven't used it in the
>>> program.
>>>
>>> Regards
>>> Tapan
>>>
>>> On Tue, May 19, 2015 at 6:26 PM, Akhil Das 
>>> wrote:
>>>
 Try something like:

 JavaPairRDD output = sc.newAPIHadoopFile(inputDir,

 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
 IntWritable.class,
   Text.class, new Job().getConfiguration());

 With the type of input format that you require.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma 
 wrote:

> Hi Team,
>
> I am new to Spark and learning.
> I am trying to read image files into spark job. This is how I am doing:
> Step 1. Created sequence files with FileName as Key and Binary image as
> value. i.e.  Text and BytesWritable.
> I am able to read these sequence files into Map Reduce programs.
>
> Step 2.
> I understand that Text and BytesWritable are Non Serializable
> therefore, I
> read the sequence file in Spark as following:
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaSequenceFile");
> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
> JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
> String.class, Byte.class) ;
> final List> tuple2s = seqFiles.collect();
>
>
>
>
> The moment I try to call collect() method to get the keys of sequence
> file,
> following exception has been thrown
>
> Can any one help me understanding why collect() method is failing? If
> I use
> toArray() on seqFiles object then also I am getting same call stack.
>
> Regards
> Tapan
>
>
>
> java.io.NotSerializableException: org.apache.hadoop.io.Text
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in
> stag

Re: How to use spark to access HBase with Security enabled

2015-05-20 Thread Bill Q
I have similar problem that I cannot pass the HBase configuration file as
extra classpath to Spark any more using
spark.executor.extraClassPath=MY_HBASE_CONF_DIR in the Spark 1.3. We used
to run this in 1.2 without any problem.

On Tuesday, May 19, 2015, donhoff_h <165612...@qq.com> wrote:

>
> Sorry, this ref does not help me.  I have set up the configuration in
> hbase-site.xml. But it seems there are still some extra configurations to
> be set or APIs to be called to make my spark program be able to pass the
> authentication with the HBase.
>
> Does anybody know how to set authentication to a secured HBase in a spark
> program which use the API "newAPIHadoopRDD" to get information from HBase?
>
> Many Thanks!
>
> -- 原始邮件 --
> *发件人:* "yuzhihong"; >;
> *发送时间:* 2015年5月19日(星期二) 晚上9:54
> *收件人:* "donhoff_h"<165612...@qq.com
> >;
> *抄送:* "user" >;
> *主题:* Re: How to use spark to access HBase with Security enabled
>
> Please take a look at:
>
> http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation
>
> Cheers
>
> On Tue, May 19, 2015 at 5:23 AM, donhoff_h <165612...@qq.com
> > wrote:
>
>>
>> The principal is sp...@bgdt.dev.hrb. It is the user that I used to run
>> my spark programs. I am sure I have run the kinit command to make it take
>> effect. And I also used the HBase Shell to verify that this user has the
>> right to scan and put the tables in HBase.
>>
>> Now I still have no idea how to solve this problem. Can anybody help me
>> to figure it out? Many Thanks!
>>
>> -- 原始邮件 --
>> *发件人:* "yuzhihong";> >;
>> *发送时间:* 2015年5月19日(星期二) 晚上7:55
>> *收件人:* "donhoff_h"<165612...@qq.com
>> >;
>> *抄送:* "user"> >;
>> *主题:* Re: How to use spark to access HBase with Security enabled
>>
>> Which user did you run your program as ?
>>
>> Have you granted proper permission on hbase side ?
>>
>> You should also check master log to see if there was some clue.
>>
>> Cheers
>>
>>
>>
>> On May 19, 2015, at 2:41 AM, donhoff_h <165612...@qq.com
>> > wrote:
>>
>> Hi, experts.
>>
>> I ran the "HBaseTest" program which is an example from the Apache Spark
>> source code to learn how to use spark to access HBase. But I met the
>> following exception:
>> Exception in thread "main"
>> org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after
>> attempts=36, exceptions:
>> Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException:
>> callTimeout=6, callDuration=68648: row 'spark_t01,,00' on
>> table 'hbase:meta' at region=hbase:meta,,1.1588230740,
>> hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0
>>
>> I also checked the RegionServer Log of the host "bgdt01.dev.hrb" listed
>> in the above exception. I found a few entries like the following one:
>> 2015-05-19 16:59:11,143 DEBUG
>> [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer:
>> RpcServer.listener,port=16020: Caught exception while
>> reading:Authentication is required
>>
>> The above entry did not point to my program clearly. But the time is very
>> near. Since my hbase version is HBase1.0.0 and I set security enabled, I
>> doubt the exception was caused by the Kerberos authentication.  But I am
>> not sure.
>>
>> Do anybody know if my guess is right? And if I am right, could anybody
>> tell me how to set Kerberos Authentication in a spark program? I don't know
>> how to do it. I already checked the API doc , but did not found any API
>> useful. Many Thanks!
>>
>> By the way, my spark version is 1.3.0. I also paste the code of
>> "HBaseTest" in the following:
>> ***Source Code**
>> object HBaseTest {
>>   def main(args: Array[String]) {
>> val sparkConf = new SparkConf().setAppName("HBaseTest")
>> val sc = new SparkContext(sparkConf)
>> val conf = HBaseConfiguration.create()
>> conf.set(TableInputFormat.INPUT_TABLE, args(0))
>>
>> // Initialize hBase table if necessary
>> val admin = new HBaseAdmin(conf)
>> if (!admin.isTableAvailable(args(0))) {
>>   val tableDesc = new HTableDescriptor(args(0))
>>   admin.createTable(tableDesc)
>> }
>>
>> val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>>   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>>   classOf[org.apache.hadoop.hbase.client.Result])
>>
>> hBaseRDD.count()
>>
>> sc.stop()
>>   }
>> }
>>
>>
>

-- 
Many thanks.


Bill


Re: Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-20 Thread Yana Kadiyska
But if I'm reading his email correctly he's saying that:

1. The master and slave are on the same box (so network hiccups are
unlikely culprit)
2. The failures are intermittent -- i.e program works for a while then
worker gets disassociated...

Is it possible that the master restarted? We used to have problems like
this where we'd restart the master process, it won't be listening on 7077
for some time, but the worker process is trying to connect and by the time
the master is up the worker has given up...


On Wed, May 20, 2015 at 5:16 AM, Evo Eftimov  wrote:

> Check whether the name can be resolved in the /etc/hosts file (or DNS) of
> the worker
>
>
>
> (the same btw applies for the Node where you run the driver app – all
> other nodes must be able to resolve its name)
>
>
>
> *From:* Stephen Boesch [mailto:java...@gmail.com]
> *Sent:* Wednesday, May 20, 2015 10:07 AM
> *To:* user
> *Subject:* Intermittent difficulties for Worker to contact Master on same
> machine in standalone
>
>
>
>
>
> What conditions would cause the following delays / failure for a
> standalone machine/cluster to have the Worker contact the Master?
>
>
>
> 15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at
> http://10.0.0.3:8081
>
> 15/05/20 02:02:53 INFO Worker: Connecting to master
> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
>
> 15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable
> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
> now gated for 5000 ms, all messages to this address will be delivered to
> dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077
>
> 15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1)
>
> ..
>
> ..
>
> 15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3)
>
> 15/05/20 02:03:26 INFO Worker: Connecting to master
> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
>
> 15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable
> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
> now gated for 5000 ms, all messages to this address will be delivered to
> dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077
>


Re: Mesos Spark Tasks - Lost

2015-05-20 Thread Panagiotis Garefalakis
Tim thanks for your reply,

I am following this quite clear mesos-spark tutorial:
https://docs.mesosphere.com/tutorials/run-spark-on-mesos/
So mainly I tried running spark-shell which locally works fine but when the
jobs are submitted through mesos something goes wrong!

My question is: is there a some extra configuration needed for the workers
(that is not mentioned at the tutorial) ??

The Executor Lost message I get is really generic so I dont know whats
going on..
Please check the attached mesos execution event log.

Thanks again,
Panagiotis


On Wed, May 20, 2015 at 8:21 AM, Tim Chen  wrote:

> Can you share your exact spark-submit command line?
>
> And also cluster mode is not yet released yet (1.4) and doesn't support
> spark-shell, so I think you're just using client mode unless you're using
> latest master.
>
> Tim
>
> On Tue, May 19, 2015 at 8:57 AM, Panagiotis Garefalakis <
> panga...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am facing a weird issue for the last couple of days running Spark on
>> top of Mesos and I need your help. I am running Mesos in a private cluster
>> and managed to deploy successfully  hdfs, cassandra, marathon and play but
>> Spark is not working for a reason. I have tried so far:
>> different java versions (1.6 and 1.7 oracle and openjdk), different
>> spark-env configuration, different Spark versions (from 0.8.8 to 1.3.1),
>> different HDFS versions (hadoop 5.1 and 4.6), and updating pom dependencies.
>>
>> More specifically while local tasks complete fine, in cluster mode all
>> the tasks get lost.
>> (both using spark-shell and spark-submit)
>> From the worker log I see something like this:
>>
>> ---
>> I0519 02:36:30.475064 12863 fetcher.cpp:214] Fetching URI
>> 'hdfs:/:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
>> I0519 02:36:30.747372 12863 fetcher.cpp:99] Fetching URI
>> 'hdfs://X:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' using Hadoop
>> Client
>> I0519 02:36:30.747546 12863 fetcher.cpp:109] Downloading resource from
>> 'hdfs://:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' to
>> '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
>> I0519 02:36:34.205878 12863 fetcher.cpp:78] Extracted resource
>> '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
>> into
>> '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3'
>> *Error: Could not find or load main class two*
>>
>> ---
>>
>> And from the Spark Terminal:
>>
>> ---
>> 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
>> 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled
>> 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at
>> SparkPi.scala:35
>> 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at
>> SparkPi.scala:35
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent
>> failure: Lost task 7.3 in stage 0.0 (TID 26, ): ExecutorLostFailure
>> (executor lost)
>> Driver stacktrace: at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> ..
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> ---
>>
>> Any help will be greatly appreciated!
>>
>> Regards,
>> Panagiotis
>>
>
>


-sparklogs-spark-shell-1431993674182-EVENT_LOG_1
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-20 Thread Nicholas Chammas
To put this on the devs' radar, I suggest creating a JIRA for it (and
checking first if one already exists).

issues.apache.org/jira/

Nick

On Tue, May 19, 2015 at 1:34 PM Matei Zaharia 
wrote:

> Yeah, this definitely seems useful there. There might also be some ways to
> cap the application in Mesos, but I'm not sure.
>
> Matei
>
> On May 19, 2015, at 1:11 PM, Thomas Dudziak  wrote:
>
> I'm using fine-grained for a multi-tenant environment which is why I would
> welcome the limit of tasks per job :)
>
> cheers,
> Tom
>
> On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia 
> wrote:
>
>> Hey Tom,
>>
>> Are you using the fine-grained or coarse-grained scheduler? For the
>> coarse-grained scheduler, there is a spark.cores.max config setting that
>> will limit the total # of cores it grabs. This was there in earlier
>> versions too.
>>
>> Matei
>>
>> > On May 19, 2015, at 12:39 PM, Thomas Dudziak  wrote:
>> >
>> > I read the other day that there will be a fair number of improvements
>> in 1.4 for Mesos. Could I ask for one more (if it isn't already in there):
>> a configurable limit for the number of tasks for jobs run on Mesos ? This
>> would be a very simple yet effective way to prevent a job dominating the
>> cluster.
>> >
>> > cheers,
>> > Tom
>> >
>>
>>
>
>


Re: Incrementally add/remove vertices in GraphX

2015-05-20 Thread vzaychik
Any updates on GraphX Streaming? There was mention of this about a year ago,
but nothing much since.
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p22963.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set HBaseConfiguration in Spark

2015-05-20 Thread Naveen Madhire
Cloudera blog has some details.

Please check if this is helpful to you.

http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/

Thanks.

On Wed, May 20, 2015 at 4:21 AM, donhoff_h <165612...@qq.com> wrote:

> Hi, all
>
> I wrote a program to get HBaseConfiguration object in Spark. But after I
> printed the content of this hbase-conf object, I found they were wrong. For
> example, the property "hbase.zookeeper.quorum" should be
> "bgdt01.dev.hrb,bgdt02.dev.hrb,bgdt03.hrb". But the printed value is
> "localhost".
>
> Could anybody tell me how to set up the HBase Configuration in Spark? No
> matter it should be set in a configuration file or be set by a Spark API.
> Many Thanks!
>
> The code of my program is listed below:
> object TestHBaseConf {
>  def main(args: Array[String]) {
>val conf = new SparkConf()
>val sc = new SparkContext(conf)
>val hbConf = HBaseConfiguration.create()
>hbConf.addResource("""file:///etc/hbase/conf/hbase-site.xml""")
>val it = hbConf.iterator()
>while(it.hasNext) {
>  val e = it.next()
>  println("Key="+ e.getKey +" Value="+e.getValue)
>}
>
>val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
>val result = rdd.sum()
>println("result="+result)
>sc.stop()
>  }
> }
>


Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-20 Thread Davies Liu
I think this is a general multiple-threading question, Queue is the
right direction to go.

Have  you try something like this?

results = Queue.Queue()

def run_job(f, args):
  r = f(*args)
  results.put(r)

# start multiple threads to run jobs
threading.Thread(target=run_job, args=(f, args,)).start()

while True:
r = results.get()
print r


On Wed, May 20, 2015 at 5:56 AM, MEETHU MATHEW  wrote:
> Hi Davies,
> Thank you for pointing to spark streaming.
> I am confused about how to return the result after running a function via  a
> thread.
> I tried using Queue to add the results to it and print it at the end.But
> here, I can see the results after all threads are finished.
> How to get the result of the function once a thread is finished, rather than
> waiting for all other threads to finish?
>
> Thanks & Regards,
> Meethu M
>
>
>
> On Tuesday, 19 May 2015 2:43 AM, Davies Liu  wrote:
>
>
> SparkContext can be used in multiple threads (Spark streaming works
> with multiple threads), for example:
>
> import threading
> import time
>
> def show(x):
> time.sleep(1)
> print x
>
> def job():
> sc.parallelize(range(100)).foreach(show)
>
> threading.Thread(target=job).start()
>
>
> On Mon, May 18, 2015 at 12:34 AM, ayan guha  wrote:
>> Hi
>>
>> So to be clear, do you want to run one operation in multiple threads
>> within
>> a function or you want run multiple jobs using multiple threads? I am
>> wondering why python thread module can't be used? Or you have already gave
>> it a try?
>>
>> On 18 May 2015 16:39, "MEETHU MATHEW"  wrote:
>>>
>>> Hi Akhil,
>>>
>>> The python wrapper for Spark Job Server did not help me. I actually need
>>> the pyspark code sample  which shows how  I can call a function from 2
>>> threads and execute it simultaneously.
>>>
>>> Thanks & Regards,
>>> Meethu M
>>>
>>>
>>>
>>> On Thursday, 14 May 2015 12:38 PM, Akhil Das 
>>> wrote:
>>>
>>>
>>> Did you happened to have a look at the spark job server? Someone wrote a
>>> python wrapper around it, give it a try.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW 
>>> wrote:
>>>
>>> Hi all,
>>>
>>>  Quote
>>>  "Inside a given Spark application (SparkContext instance), multiple
>>> parallel jobs can run simultaneously if they were submitted from separate
>>> threads. "
>>>
>>> How to run multiple jobs in one SPARKCONTEXT using separate threads in
>>> pyspark? I found some examples in scala and java, but couldn't find
>>> python
>>> code. Can anyone help me with a pyspark example?
>>>
>>> Thanks & Regards,
>>> Meethu M
>
>>>
>>>
>>>
>>>
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LATERAL VIEW explode issue

2015-05-20 Thread kiran mavatoor
Hi Yana,
I was using sqlContext in the program by creating new SqlContext(sc). This was 
created the problem when i submit the job using spark-submit. Where as, when I 
run the same program in spark-shell, the default context is hive context (it 
seems ) and every thing seems to be fine. This created confusion. 
As solution, i called new HiveContext(sc) instead of SqlContext.
cheerskiran. 


 On Wednesday, May 20, 2015 6:38 PM, yana  wrote:
   

 Just a guess but are you using HiveContext in one case vs SqlContext 
inanother? You dont show a stacktrace but this looks like parser error...Which 
would make me guess different  context or different spark versio on the cluster 
you are submitting to...

Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message From: kiran mavatoor Date:05/20/2015 5:57 AM 
(GMT-05:00) To: User Subject: LATERAL VIEW explode issue 
Hi,
When I use "LATERAL VIEW explode" on the registered temp table in spark shell, 
it works.  But when I use the same in spark-submit (as jar file) it is not 
working. its giving error -  "failure: ``union'' expected but identifier VIEW 
found"
sql statement i am using is
SELECT id,mapKey FROM locations LATERAL VIEW 
explode(map_keys(jsonStringToMapUdf(countries))) countries AS mapKey
I registered "jsonStringToMapUdf" as my sql function.
ThanksKiran9008099770  

  

Re: java program Get Stuck at broadcasting

2015-05-20 Thread Akhil Das
This is more like an issue with your HDFS setup, can you check in the
datanode logs? Also try putting a new file in HDFS and see if that works.

Thanks
Best Regards

On Wed, May 20, 2015 at 11:47 AM, allanjie  wrote:

> ​Hi All,
> The variable I need to broadcast is just 468 MB.
>
>
> When broadcasting, it just “stop” at here:
>
> *
> 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is
> deprecated. Instead, use mapreduce.task.id
> 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is
> deprecated. Instead, use mapreduce.task.attempt.id
> 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is
> deprecated. Instead, use mapreduce.task.ismap
> 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is
> deprecated. Instead, use mapreduce.task.partition
> 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is
> deprecated. Instead, use mapreduce.job.id
> 15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process
> : 1
> 15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at
> Test1.java:90
> 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
> Test1.java:90) with 4 output partitions (allowLocal=false)
> 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage
> 0(saveAsTextFile at Test1.java:90)
> 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
> 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List()
> 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0
> (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no
> missing parents
> 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called
> with curMem=988453294, maxMem=2061647216
> 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as
> values in memory (estimated size 126.2 KB, free 1023.4 MB)
> 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called
> with curMem=988582558, maxMem=2061647216
> 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
> as bytes in memory (estimated size 76.4 KB, free 1023.3 MB)
> 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB)
> 15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from
> broadcast at DAGScheduler.scala:839
> 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
> from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90)
> 15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
> with
> 4 tasks
> 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
> 0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes)
> 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
> 0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes)
> 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
> 0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes)
> 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
> 0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes)
> 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB)
> 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB)
> …….
> 15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1
> in
> memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB)
> *
>
> And didn’t go forward as I still waiting, basically not stop, but more like
> stuck.
>
> I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage.
> After a few mins pass, the program stopped and showed something like this:
>
>
> 15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
> (TID 1, HadoopV26Slave3):
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
>
> /user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1
> could only be replicated to 0 nodes instead of minReplication (=1).  There
> are 6 datanode(s) running and no node(s) are excluded in this operation.
> at
>
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549)
> at
>
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200)
> at
>
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
> at
>
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolP

Re: Is this a good use case for Spark?

2015-05-20 Thread Davies Liu
Spark is a great framework to do things in parallel with multiple machines,
will be really helpful for your case.

Once you can wrap your entire pipeline into a single Python function:

def process_document(path, text):
 # you can call other tools or services here
 return xxx

then you can process all the documents in parallel as easy as:

sc.wholeTextFiles("path/to/documents").map(lambda (k, v):
process_document(k, v)).saveAsXXX("path/in/s3")

On Wed, May 20, 2015 at 12:38 AM, jakeheller  wrote:
> Hi all, I'm new to Spark -- so new that we're deciding whether to use it in
> the first place, and I was hoping someone here could help me figure that
> out.
>
> We're doing a lot of processing of legal documents -- in particular, the
> entire corpus of American law. It's about 10m documents, many of which are
> quite large as far as text goes (100s of pages).
>
> We'd like to
> (a) transform these documents from the various (often borked) formats they
> come to us in into a standard XML format,
> (b) when it is in a standard format, extract information from them (e.g.,
> which judicial cases cite each other?) and annotate the documents with the
> information extracted, and then
> (c) deliver the end result to a repository (like s3) where it can be
> accessed by the user-facing application.
>
> Of course, we'd also like to do all of this quickly -- optimally, running
> the entire database through the whole pipeline in a few hours.
>
> We currently use a mix of Python and Java scripts (including XSLT, and
> NLP/unstructured data tools like UIMA and Stanford's CoreNLP) in various
> places along the pipeline we built for ourselves to handle these tasks. The
> current pipeline infrastructure was built a while back -- it's basically a
> number of HTTP servers that each have a single task and pass the document
> along from server to server as it goes through the processing pipeline. It's
> great although it's having trouble scaling, and there are some reliability
> issues. It's also a headache to handle all the infrastructure. For what it's
> worth, metadata about the documents resides in SQL, and the actual text of
> the documents lives in s3.
>
> It seems like Spark would be ideal for this, but after some searching I
> wasn't able to find too many examples of people using it for
> document-processing tasks (like transforming documents from one XML format
> into another) and I'm not clear if I can chain those sorts of tasks and NLP
> tasks, especially if some happen in Python and others in Java. Finally, I
> don't know if the size of the data (i.e., we'll likely want to run
> operations on whole documents, rather than just lines) imposes
> issues/constraints.
>
> Thanks all!
> Jake
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-this-a-good-use-case-for-Spark-tp22954.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark users

2015-05-20 Thread Akhil Das
Yes, this is the user group. Feel free to ask your questions in this list.

Thanks
Best Regards

On Wed, May 20, 2015 at 5:58 AM, Ricardo Goncalves da Silva <
ricardog.si...@telefonica.com> wrote:

>  Hi
> I'm learning spark focused on data and machine learning. Migrating from
> SAS.
>
> There is a group for it? My questions are basic for now and I having very
> few answers.
>
> Tal
>
> Rick.
>
>
>
> Enviado do meu smartphone Samsung Galaxy.
>
> --
>
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario,
> puede contener información privilegiada o confidencial y es para uso
> exclusivo de la persona o entidad de destino. Si no es usted. el
> destinatario indicado, queda notificado de que la lectura, utilización,
> divulgación y/o copia sin autorización puede estar prohibida en virtud de
> la legislación vigente. Si ha recibido este mensaje por error, le rogamos
> que nos lo comunique inmediatamente por esta misma vía y proceda a su
> destrucción.
>
> The information contained in this transmission is privileged and
> confidential information intended only for the use of the individual or
> entity named above. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this transmission in error, do not read it. Please immediately reply to the
> sender that you have received this communication in error and then delete
> it.
>
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário,
> pode conter informação privilegiada ou confidencial e é para uso exclusivo
> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou
> cópia sem autorização pode estar proibida em virtude da legislação vigente.
> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique
> imediatamente por esta mesma via e proceda a sua destruição
>


IPv6 support

2015-05-20 Thread Kevin Liu
Hello, I have to work with IPv6 only servers and when I installed the
1.3.1 hadoop 2.6 build, I couldn¹t get the example to run due to IPv6
issues (errors below). I tried to add the
-Djava.net.preferIPv6Addresses=true setting but it still doesn¹t work. A
search on Spark¹s support for IPv6 is inconclusive. Can someone help
clarify the current status for IPv6?

Thanks
Kevin


‹‹ errors ‹

5/05/20 10:17:30 INFO Executor: Fetching
http://2401:db00:2030:709b:face:0:9:0:51453/jars/spark-examples-1.3.1-hadoo
p2.6.0.jar with timestamp 1432142250197
15/05/20 10:17:30 INFO Executor: Fetching
http://2401:db00:2030:709b:face:0:9:0:51453/jars/spark-examples-1.3.1-hadoo
p2.6.0.jar with timestamp 1432142250197
15/05/20 10:17:30 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID
5)
java.net.MalformedURLException: For input string:
"db00:2030:709b:face:0:9:0:51453"
at java.net.URL.(URL.java:620)
at java.net.URL.(URL.java:483)
at java.net.URL.(URL.java:432)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:603)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:431)
at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Execu
tor$$updateDependencies$5.apply(Executor.scala:374)
at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Execu
tor$$updateDependencies$5.apply(Executor.scala:366)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Traver
sableLike.scala:772)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:7
71)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$upda
teDependencies(Executor.scala:366)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string:
"db00:2030:709b:face:0:9:0:51453"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:6
5)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at java.net.URLStreamHandler.parseURL(URLStreamHandler.java:216)
at java.net.URL.(URL.java:615)
... 18 more






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Re: spark 1.3.1 jars in repo1.maven.org

2015-05-20 Thread Edward Sargisson
Hi Sean and Ted,
Thanks for your replies.

I don't have our current problems nicely written up as good questions yet.
I'm still sorting out classpath issues, etc.
In case it is of help, I'm seeing:
* "Exception in thread "Spark Context Cleaner"
java.lang.NoClassDefFoundError: 0
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)"
* We've been having clashing dependencies between a colleague and I because
of the aforementioned classpath issue
* The clashing dependencies are also causing issues with what jetty
libraries are available in the classloader from Spark and don't clash with
existing libraries we have.

More anon,

Cheers,
Edward



 Original Message 
 Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38
From: Sean Owen  To: Edward Sargisson 
Cc: user 


Yes, the published artifacts can only refer to one version of anything
(OK, modulo publishing a large number of variants under classifiers).

You aren't intended to rely on Spark's transitive dependencies for
anything. Compiling against the Spark API has no relation to what
version of Hadoop it binds against because it's not part of any API.
You mark the Spark dependency even as "provided" in your build and get
all the Spark/Hadoop bindings at runtime from our cluster.

What problem are you experiencing?


On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson  wrote:

Hi,
I'd like to confirm an observation I've just made. Specifically that spark
is only available in repo1.maven.org for one Hadoop variant.

The Spark source can be compiled against a number of different Hadoops using
profiles. Yay.
However, the spark jars in repo1.maven.org appear to be compiled against one
specific Hadoop and no other differentiation is made. (I can see a
difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
the version I compiled locally).

The implication here is that if you have a pom file asking for
spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
version. Maven assumes that non-snapshot artifacts never change so trying to
load an Hadoop 1 version will end in tears.

This then means that if you compile code against spark-core then there will
probably be classpath NoClassDefFound issues unless the Hadoop 2 version is
exactly the one you want.

Have I gotten this correct?

It happens that our little app is using a Spark context directly from a
Jetty webapp and the classpath differences were/are causing some confusion.
We are currently installing a Hadoop 1 spark master and worker.

Thanks a lot!
Edward


Re: Re: spark 1.3.1 jars in repo1.maven.org

2015-05-20 Thread Sean Owen
I don't think any of those problems are related to Hadoop. Have you looked
at userClassPathFirst settings?

On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson  wrote:

> Hi Sean and Ted,
> Thanks for your replies.
>
> I don't have our current problems nicely written up as good questions yet.
> I'm still sorting out classpath issues, etc.
> In case it is of help, I'm seeing:
> * "Exception in thread "Spark Context Cleaner"
> java.lang.NoClassDefFoundError: 0
> at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)"
> * We've been having clashing dependencies between a colleague and I
> because of the aforementioned classpath issue
> * The clashing dependencies are also causing issues with what jetty
> libraries are available in the classloader from Spark and don't clash with
> existing libraries we have.
>
> More anon,
>
> Cheers,
> Edward
>
>
>
>  Original Message 
>  Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38
> From: Sean Owen  To: Edward Sargisson <
> esa...@pobox.com> Cc: user 
>
>
> Yes, the published artifacts can only refer to one version of anything
> (OK, modulo publishing a large number of variants under classifiers).
>
> You aren't intended to rely on Spark's transitive dependencies for
> anything. Compiling against the Spark API has no relation to what
> version of Hadoop it binds against because it's not part of any API.
> You mark the Spark dependency even as "provided" in your build and get
> all the Spark/Hadoop bindings at runtime from our cluster.
>
> What problem are you experiencing?
>
>
> On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson 
> wrote:
>
> Hi,
> I'd like to confirm an observation I've just made. Specifically that spark
> is only available in repo1.maven.org for one Hadoop variant.
>
> The Spark source can be compiled against a number of different Hadoops
> using
> profiles. Yay.
> However, the spark jars in repo1.maven.org appear to be compiled against
> one
> specific Hadoop and no other differentiation is made. (I can see a
> difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
> the version I compiled locally).
>
> The implication here is that if you have a pom file asking for
> spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
> version. Maven assumes that non-snapshot artifacts never change so trying
> to
> load an Hadoop 1 version will end in tears.
>
> This then means that if you compile code against spark-core then there will
> probably be classpath NoClassDefFound issues unless the Hadoop 2 version is
> exactly the one you want.
>
> Have I gotten this correct?
>
> It happens that our little app is using a Spark context directly from a
> Jetty webapp and the classpath differences were/are causing some confusion.
> We are currently installing a Hadoop 1 spark master and worker.
>
> Thanks a lot!
> Edward
>
>
>
>


Re: User Defined Type (UDT)

2015-05-20 Thread Justin Uang
Xiangrui, is there a timeline for when UDTs will become a public API? I'm
currently using them to support java 8's ZonedDateTime.

On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng  wrote:

> (Note that UDT is not a public API yet.)
>
> On Thu, May 7, 2015 at 7:11 AM, wjur  wrote:
> > Hi all!
> >
> > I'm using Spark 1.3.0 and I'm struggling with a definition of a new type
> for
> > a project I'm working on. I've created a case class Person(name: String)
> and
> > now I'm trying to make Spark to be able serialize and deserialize the
> > defined type. I made a couple of attempts but none of them did not work
> in
> > 100% (there were issues either in serialization or deserialization).
> >
> > This is my class and the corresponding UDT.
> >
> > @SQLUserDefinedType(udt = classOf[PersonUDT])
> > case class Person(name: String)
> >
> > class PersonUDT extends UserDefinedType[Person] {
> >   override def sqlType: DataType = StructType(Seq(StructField("name",
> > StringType)))
> >
> >   override def serialize(obj: Any): Seq[Any] = {
>
> This should return a Row instance instead of Seq[Any], because the
> sqlType is a struct type.
>
> > obj match {
> >   case c: Person =>
> > Seq(c.name)
> > }
> >   }
> >
> >   override def userClass: Class[Person] = classOf[Person]
> >
> >   override def deserialize(datum: Any): Person = {
> > datum match {
> >   case values: Seq[_] =>
> > assert(values.length == 1)
> > Person(values.head.asInstanceOf[String])
> >   case values: util.ArrayList[_] =>
> > Person(values.get(0).asInstanceOf[String])
> > }
> >   }
> >
> >   // In some other attempt I was creating RDD of Seq with manually
> > serialized data and
> >   // I had to override equals because two DFs with the same type weren't
> > actually equal
> >   // StructField(person,...types.PersonUDT@a096ac3)
> >   // StructField(person,...types.PersonUDT@613fd937)
> >   def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
> >
> >   override def equals(other: Any): Boolean = other match {
> > case that: PersonUDT => true
> > case _ => false
> >   }
> >
> >   override def hashCode(): Int = 1
> > }
> >
> > This is how I create RDD of Person and then try to create a DataFrame
> > val rdd = sparkContext.parallelize((1 to 100).map(i =>
> Person(i.toString)))
> > val sparkDataFrame = sqlContext.createDataFrame(rdd)
> >
> > The second line throws an exception:
> > java.lang.ClassCastException: types.PersonUDT cannot be cast to
> > org.apache.spark.sql.types.StructType
> > at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
> >
> > I looked into the code in SQLContext.scala and it seems that the code
> > requires UDT to be extending StructType but in fact it extends
> > UserDefinedType which extends directly DataType.
> > I'm not sure whether it is a bug or I just don't know how to use UDTs.
> >
> > Do you have any suggestions how to solve this? I based my UDT on
> > ExamplePointUDT but it seems to be incorrect. Is there a working example
> for
> > UDT?
> >
> >
> > Thank you for the reply in advance!
> > wjur
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-20 Thread Marcelo Vanzin
Hello,

Sorry for the delay. The issue you're running into is because most HBase
classes are in the system class path, while jars added with "--jars" are
only visible to the application class loader created by Spark. So classes
in the system class path cannot see them.

You can work around this by setting "--driver-classpath
/opt/.../htrace-core-3.1.0-incubating.jar" and "--conf
spark.executor.extraClassPath=
/opt/.../htrace-core-3.1.0-incubating.jar" in your spark-submit command
line. (You can also add those configs to your spark-defaults.conf to avoid
having to type them all the time; and don't forget to include any other
jars that might be needed.)


On Mon, May 18, 2015 at 11:14 PM, Fengyun RAO  wrote:

> Thanks, Marcelo!
>
>
> Below is the full log,
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/05/19 14:08:58 INFO yarn.ApplicationMaster: Registered signal handlers for 
> [TERM, HUP, INT]
> 15/05/19 14:08:59 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
> appattempt_1432015548391_0003_01
> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: 
> nobody,raofengyun
> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: 
> nobody,raofengyun
> 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(nobody, 
> raofengyun); users with modify permissions: Set(nobody, raofengyun)
> 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Starting the user application 
> in a separate Thread
> 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context 
> initialization
> 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context 
> initialization ...
> 15/05/19 14:09:00 INFO spark.SparkContext: Running Spark version 1.3.0
> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: 
> nobody,raofengyun
> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: 
> nobody,raofengyun
> 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(nobody, 
> raofengyun); users with modify permissions: Set(nobody, raofengyun)
> 15/05/19 14:09:01 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/05/19 14:09:01 INFO Remoting: Starting remoting
> 15/05/19 14:09:01 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@gs-server-v-127:7191]
> 15/05/19 14:09:01 INFO Remoting: Remoting now listens on addresses: 
> [akka.tcp://sparkDriver@gs-server-v-127:7191]
> 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'sparkDriver' 
> on port 7191.
> 15/05/19 14:09:01 INFO spark.SparkEnv: Registering MapOutputTracker
> 15/05/19 14:09:01 INFO spark.SparkEnv: Registering BlockManagerMaster
> 15/05/19 14:09:01 INFO storage.DiskBlockManager: Created local directory at 
> /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/blockmgr-3250910b-693e-46ff-b057-26d552fd8abd
> 15/05/19 14:09:01 INFO storage.MemoryStore: MemoryStore started with capacity 
> 259.7 MB
> 15/05/19 14:09:01 INFO spark.HttpFileServer: HTTP File server directory is 
> /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/httpd-5bc614bc-d8b1-473d-a807-4d9252eb679d
> 15/05/19 14:09:01 INFO spark.HttpServer: Starting HTTP Server
> 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/05/19 14:09:01 INFO server.AbstractConnector: Started 
> SocketConnector@0.0.0.0:9349
> 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'HTTP file 
> server' on port 9349.
> 15/05/19 14:09:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator
> 15/05/19 14:09:01 INFO ui.JettyUtils: Adding filter: 
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/05/19 14:09:01 INFO server.AbstractConnector: Started 
> SelectChannelConnector@0.0.0.0:63023
> 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'SparkUI' on 
> port 63023.
> 15/05/19 14:09:01 INFO ui.SparkUI: Started SparkUI at 
> http://gs-server-v-127:63023
> 15/05/19 14:09:02 INFO cluster.YarnClusterScheduler: Created 
> YarnClusterScheduler
> 15/05/19 14:09:02 INFO netty.NettyBlockTransferService: Server created on 
> 33526
> 15/05/19 14:09:02 INFO storage.BlockManagerMaster: Trying to register 
> BlockManager
> 15/05/19 14:09:02 INFO storage.BlockManagerMasterActor: Registering block 
> mana

FP Growth saveAsTextFile

2015-05-20 Thread Eric Tanner
I am having trouble with saving an FP-Growth model as a text file.  I can
print out the results, but when I try to save the model I get a
NullPointerException.

model.freqItemsets.saveAsTextFile("c://fpGrowth/model")

Thanks,

Eric


Re: PySpark Logs location

2015-05-20 Thread Ruslan Dautkhanov
You could use

yarn logs -applicationId application_1383601692319_0008



-- 
Ruslan Dautkhanov

On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets 
wrote:

> Hi ,
>
>   I am executing PySpark job on yarn ( hortonworks distribution).
>
> Could someone pointing me where is the log locations?
>
> Thanks
> Oleg.
>


Re: PySpark Logs location

2015-05-20 Thread Oleg Ruchovets
Hi Ruslan.
  Could you add more details please.
Where do I get applicationId? In case I have a lot of log files would it
make sense to view it from single point.
How actually I can configure / manage log location of PySpark?

Thanks
Oleg.

On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov 
wrote:

> You could use
>
> yarn logs -applicationId application_1383601692319_0008
>
>
>
> --
> Ruslan Dautkhanov
>
> On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets 
> wrote:
>
>> Hi ,
>>
>>   I am executing PySpark job on yarn ( hortonworks distribution).
>>
>> Could someone pointing me where is the log locations?
>>
>> Thanks
>> Oleg.
>>
>
>


Read multiple files from S3

2015-05-20 Thread lovelylavs
Hi,

I am trying to get a collection of files according to LastModifiedDate from
S3

List   FileNames = new ArrayList();

ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName(s3_bucket)
.withPrefix(logs_dir);

ObjectListing objectListing;


do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (S3ObjectSummary objectSummary :
objectListing.getObjectSummaries()) {

if
((objectSummary.getLastModified().compareTo(dayBefore) > 0)  &&
(objectSummary.getLastModified().compareTo(dayAfter) <1) &&
objectSummary.getKey().contains(".log"))
FileNames.add(objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());

I would like to process these files using Spark

I understand that textFile reads a single text file. Is there any way to
read all these files that are part of the List?

Thanks for your help.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Read-multiple-files-from-S3-tp22965.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark Logs location

2015-05-20 Thread Ruslan Dautkhanov
Oleg,

You can see applicationId in your Spark History Server.
Go to http://historyserver:18088/

Also check
https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application

It should be no different with PySpark.


-- 
Ruslan Dautkhanov

On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets 
wrote:

> Hi Ruslan.
>   Could you add more details please.
> Where do I get applicationId? In case I have a lot of log files would it
> make sense to view it from single point.
> How actually I can configure / manage log location of PySpark?
>
> Thanks
> Oleg.
>
> On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov 
> wrote:
>
>> You could use
>>
>> yarn logs -applicationId application_1383601692319_0008
>>
>>
>>
>> --
>> Ruslan Dautkhanov
>>
>> On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets 
>> wrote:
>>
>>> Hi ,
>>>
>>>   I am executing PySpark job on yarn ( hortonworks distribution).
>>>
>>> Could someone pointing me where is the log locations?
>>>
>>> Thanks
>>> Oleg.
>>>
>>
>>
>


Re: User Defined Type (UDT)

2015-05-20 Thread Xiangrui Meng
Probably in 1.5. I made a JIRA for it:
https://issues.apache.org/jira/browse/SPARK-7768. You can watch that
JIRA (and vote). -Xiangrui

On Wed, May 20, 2015 at 11:03 AM, Justin Uang  wrote:
> Xiangrui, is there a timeline for when UDTs will become a public API? I'm
> currently using them to support java 8's ZonedDateTime.
>
> On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng  wrote:
>>
>> (Note that UDT is not a public API yet.)
>>
>> On Thu, May 7, 2015 at 7:11 AM, wjur  wrote:
>> > Hi all!
>> >
>> > I'm using Spark 1.3.0 and I'm struggling with a definition of a new type
>> > for
>> > a project I'm working on. I've created a case class Person(name: String)
>> > and
>> > now I'm trying to make Spark to be able serialize and deserialize the
>> > defined type. I made a couple of attempts but none of them did not work
>> > in
>> > 100% (there were issues either in serialization or deserialization).
>> >
>> > This is my class and the corresponding UDT.
>> >
>> > @SQLUserDefinedType(udt = classOf[PersonUDT])
>> > case class Person(name: String)
>> >
>> > class PersonUDT extends UserDefinedType[Person] {
>> >   override def sqlType: DataType = StructType(Seq(StructField("name",
>> > StringType)))
>> >
>> >   override def serialize(obj: Any): Seq[Any] = {
>>
>> This should return a Row instance instead of Seq[Any], because the
>> sqlType is a struct type.
>>
>> > obj match {
>> >   case c: Person =>
>> > Seq(c.name)
>> > }
>> >   }
>> >
>> >   override def userClass: Class[Person] = classOf[Person]
>> >
>> >   override def deserialize(datum: Any): Person = {
>> > datum match {
>> >   case values: Seq[_] =>
>> > assert(values.length == 1)
>> > Person(values.head.asInstanceOf[String])
>> >   case values: util.ArrayList[_] =>
>> > Person(values.get(0).asInstanceOf[String])
>> > }
>> >   }
>> >
>> >   // In some other attempt I was creating RDD of Seq with manually
>> > serialized data and
>> >   // I had to override equals because two DFs with the same type weren't
>> > actually equal
>> >   // StructField(person,...types.PersonUDT@a096ac3)
>> >   // StructField(person,...types.PersonUDT@613fd937)
>> >   def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
>> >
>> >   override def equals(other: Any): Boolean = other match {
>> > case that: PersonUDT => true
>> > case _ => false
>> >   }
>> >
>> >   override def hashCode(): Int = 1
>> > }
>> >
>> > This is how I create RDD of Person and then try to create a DataFrame
>> > val rdd = sparkContext.parallelize((1 to 100).map(i =>
>> > Person(i.toString)))
>> > val sparkDataFrame = sqlContext.createDataFrame(rdd)
>> >
>> > The second line throws an exception:
>> > java.lang.ClassCastException: types.PersonUDT cannot be cast to
>> > org.apache.spark.sql.types.StructType
>> > at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
>> >
>> > I looked into the code in SQLContext.scala and it seems that the code
>> > requires UDT to be extending StructType but in fact it extends
>> > UserDefinedType which extends directly DataType.
>> > I'm not sure whether it is a bug or I just don't know how to use UDTs.
>> >
>> > Do you have any suggestions how to solve this? I based my UDT on
>> > ExamplePointUDT but it seems to be incorrect. Is there a working example
>> > for
>> > UDT?
>> >
>> >
>> > Thank you for the reply in advance!
>> > wjur
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Tathagata Das
Correcting the ones that are incorrect or incomplete. BUT this is good list
for things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat 
wrote:

> Hi,
>
> I have compiled a list (from online sources) of knobs/design
> considerations that need to be taken care of by applications running on
> spark streaming. Is my understanding correct?  Any other important design
> consideration that I should take care of?
>
>
>- A DStream is associated with a single receiver. For attaining read
>parallelism multiple receivers i.e. multiple DStreams need to be created.
>- A receiver is run within an executor. It occupies one core. Ensure
>that there are enough cores for processing after receiver slots are booked
>i.e. spark.cores.max should take the receiver slots into account.
>- The receivers are allocated to executors in a round robin fashion.
>- When data is received from a stream source, receiver creates blocks
>of data.  A new block of data is generated every blockInterval
>milliseconds. N blocks of data are created during the batchInterval where N
>= batchInterval/blockInterval.
>- These blocks are distributed by the BlockManager of the current
>executor to the block managers of other executors. After that, the Network
>Input Tracker running on the driver is informed about the block locations
>for further processing.
>- A RDD is created on the driver for the blocks created during the
>batchInterval. The blocks generated during the batchInterval are partitions
>of the RDD. Each partition is a task in spark. blockInterval==
>batchinterval would mean that a single partition is created and probably it
>is processed locally.
>
> The map tasks on the blocks are processed in the executors (one that
received the block, and another where the block was replicated) that has
the blocks irrespective of block interval, unless non-local scheduling
kicks in (as you observed next).

>
>- Having bigger blockinterval means bigger blocks. A high value of
>spark.locality.wait increases the chance of processing a block on the local
>node. A balance needs to be found out between these two parameters to
>ensure that the bigger blocks are processed locally.
>- Instead of relying on batchInterval and blockInterval, you can
>define the number of partitions by calling dstream.repartition(n). This
>reshuffles the data in RDD randomly to create n number of partitions.
>
> Yes, for greater parallelism. Though comes at the cost of a shuffle.

>
>- An RDD's processing is scheduled by driver's jobscheduler as a job.
>At a given point of time only one job is active. So, if one job is
>executing the other jobs are queued.
>
>
>- If you have two dstreams there will be two RDDs formed and there
>will be two jobs created which will be scheduled one after the another.
>
>
>- To avoid this, you can union two dstreams. This will ensure that a
>single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
>is then considered as a single job. However the partitioning of the RDDs is
>not impacted.
>
> To further clarify, the jobs depend on the number of output operations
(print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
output operations.

dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }// one Spark
job per batch

dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() }
}// TWO Spark jobs per batch

dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd =>
rdd.count }  // TWO Spark jobs per batch

>
>
>

>
>-
>- If the batch processing time is more than batchinterval then
>obviously the receiver's memory will start filling up and will end up in
>throwing exceptions (most probably BlockNotFoundException). Currently there
>is  no way to pause the receiver.
>
> You can limit the rate of receiver using SparkConf config
spark.streaming.receiver.maxRate

>
>-
>- For being fully fault tolerant, spark streaming needs to enable
>checkpointing. Checkpointing increases the batch processing time.
>
> Incomplete. There are two types of checkpointing - data and metadata. Only
data checkpointing, needed by only some operations, increase batch
processing time. Read -
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Furthemore, with checkpoint you can recover computation, but you may loose
some data (that was received but not processed before driver failed) for
some sources. Enabling write ahead logs and reliable source + receiver,
allow zero data loss. Read - WAL in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics

>
>- The frequency of metadata checkpoint cleaning can be controlled
>using spark.cleaner.ttl. But, data checkpoint cleaning happens
>automatically when the RDDs in the checkpoint are n

Re: FP Growth saveAsTextFile

2015-05-20 Thread Xiangrui Meng
Could you post the stack trace? If you are using Spark 1.3 or 1.4, it
would be easier to save freq itemsets as a Parquet file. -Xiangrui

On Wed, May 20, 2015 at 12:16 PM, Eric Tanner
 wrote:
> I am having trouble with saving an FP-Growth model as a text file.  I can
> print out the results, but when I try to save the model I get a
> NullPointerException.
>
> model.freqItemsets.saveAsTextFile("c://fpGrowth/model")
>
> Thanks,
>
> Eric

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-20 Thread Davies Liu
Could you file a JIRA for this?

The executor should run under the user who submit a job, I think.

On Wed, May 20, 2015 at 2:40 AM, Tomasz Fruboes
 wrote:
> Thanks for a suggestion. I have tried playing with it, sc.sparkUser() gives
> me expected user name, but it doesnt solve the problem. From a quick search
> through the spark code it seems to me, that this setting is effective only
> for yarn and mesos.
>
>  I think the workaround for the problem could be using "--deploy-mode
> cluster" (not 100% convenient, since disallows any interactive work), but
> this is not supported for python based programs.
>
> Cheers,
>   Tomasz
>
>
>
> W dniu 20.05.2015 o 10:57, Iulian Dragoș pisze:
>>
>> You could try setting `SPARK_USER` to the user under which your workers
>> are running. I couldn't find many references to this variable, but at
>> least Yarn and Mesos take it into account when spawning executors.
>> Chances are that standalone mode also does it.
>>
>> iulian
>>
>> On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes
>> mailto:tomasz.frub...@fuw.edu.pl>> wrote:
>>
>> Hi,
>>
>>   thanks for answer. The rights are
>>
>> drwxr-xr-x 3 tfruboes all 5632 05-19 15 :40
>> test19EE/
>>
>>   I have tried setting the rights to 777 for this directory prior to
>> execution. This does not get propagated down the chain, ie the
>> directory created as a result of the "save" call
>> (namesAndAges.parquet2 in the path in the dump [1] below) is created
>> with the drwxr-xr-x rights (owned by the user submitting the job, ie
>> tfruboes). The temp directories created inside
>>
>> namesAndAges.parquet2/_temporary/0/
>>
>> (e.g. task_201505200920_0009_r_01) are owned by root, again with
>> drwxr-xr-x access rights
>>
>>   Cheers,
>>Tomasz
>>
>> W dniu 19.05.2015 o 23:56, Davies Liu pisze:
>>
>> It surprises me, could you list the owner information of
>> /mnt/lustre/bigdata/med_home/tmp/test19EE/ ?
>>
>> On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes
>> mailto:tomasz.frub...@fuw.edu.pl>>
>>
>> wrote:
>>
>> Dear Experts,
>>
>>we have a spark cluster (standalone mode) in which master
>> and workers are
>> started from root account. Everything runs correctly to the
>> point when we
>> try doing operations such as
>>
>>   dataFrame.select("name", "age").save(ofile, "parquet")
>>
>> or
>>
>>   rdd.saveAsPickleFile(ofile)
>>
>> , where ofile is path on a network exported filesystem
>> (visible on all
>> nodes, in our case this is lustre, I guess on nfs effect
>> would be similar).
>>
>>Unsurprisingly temp files created on workers are owned by
>> root, which then
>> leads to a crash (see [1] below). Is there a
>> solution/workaround for this
>> (e.g. controlling file creation mode of the temporary files)?
>>
>> Cheers,
>>Tomasz
>>
>>
>> ps I've tried to google this problem, couple of similar
>> reports, but no
>> clear answer/solution found
>>
>> ps2 For completeness - running master/workers as a regular
>> user solves the
>> problem only for the given user. For other users submitting
>> to this master
>> the result is given in [2] below
>>
>>
>> [0] Cluster details:
>> Master/workers: centos 6.5
>> Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the
>> 2.6 build)
>>
>>
>> [1]
>>
>> ##
>>  File
>>
>> "/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o27.save.
>> : java.io.IOException: Failed to rename
>>
>> DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet;
>> isDirectory=false; length=534; replication=1;
>> blocksize=33554432;
>> modification_time=1432042832000; access_time=0; owner=;
>> group=;
>> permission=rw-rw-rw-; isSymlink=false} to
>>
>> file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
>>   at
>>
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
>>   at
>>
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
>>   at
>>
>> org.apache.hadoop.mapreduce.lib

Re: Spark 1.3.1 - SQL Issues

2015-05-20 Thread Davies Liu
The docs had been updated.

You should convert the DataFrame to RDD by `df.rdd`

On Mon, Apr 20, 2015 at 5:23 AM, ayan guha  wrote:
> Hi
> Just upgraded to Spark 1.3.1.
>
> I am getting an warning
>
> Warning (from warnings module):
>   File
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py",
> line 191
> warnings.warn("inferSchema is deprecated, please use createDataFrame
> instead")
> UserWarning: inferSchema is deprecated, please use createDataFrame instead
>
> However, documentation still says to use inferSchema.
> Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
> section
>
> Also, I am getting an error in mlib.ALS.train function when passing
> dataframe (do I need to convert the DF to RDD?)
>
> Code:
> training = ssc.sql("select userId,movieId,rating from ratings where
> partitionKey < 6").cache()
> print type(training)
> model = ALS.train(training,rank,numIter,lmbda)
>
> Error:
> 
> Rank:8 Lmbda:1.0 iteration:10
>
> Traceback (most recent call last):
>   File "D:\Project\Spark\code\movie_sql.py", line 109, in 
> bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
>   File "D:\Project\Spark\code\movie_sql.py", line 54, in getBestModel
> model = ALS.train(trainingRDD,rank,numIter,lmbda)
>   File
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py",
> line 139, in train
> model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank,
> iterations,
>   File
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py",
> line 127, in _prepare
> assert isinstance(ratings, RDD), "ratings should be RDD"
> AssertionError: ratings should be RDD
>
> --
> Best Regards,
> Ayan Guha

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Don Drake
I'm running Spark v1.3.1 and when I run the following against my dataset:

model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catFeatu
res, maxDepth=6, numIterations=3)

The job will fail with the following message:
Traceback (most recent call last):
  File "/Users/drake/fd/spark/mltest.py", line 73, in 
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
  File
"/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
line 553, in trainRegressor
loss, numIterations, learningRate, maxDepth)
  File
"/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
line 438, in _train
loss, numIterations, learningRate, maxDepth)
  File
"/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
line 120, in callMLlibFunc
return callJavaFunc(sc, api, *args)
  File
"/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
line 113, in callJavaFunc
return _java2py(sc, func(*args))
  File
"/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
py4j.protocol.Py4JJavaError: An error occurred while calling
o69.trainGradientBoostedTreesModel.
: java.lang.IllegalArgumentException: requirement failed: DecisionTree
requires maxBins (= 32) >= max categories in categorical features (= 1895)
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
at
org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
at
org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
at
org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
at
org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)

So, it's complaining about the maxBins, if I provide maxBins=1900 and
re-run it:

model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catFeatu
res, maxDepth=6, numIterations=3, maxBins=1900)

Traceback (most recent call last):
  File "/Users/drake/fd/spark/mltest.py", line 73, in 
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catF
eatures, maxDepth=6, numIterations=3, maxBins=1900)
TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'

It now says it knows nothing of maxBins.

If I run the same command against DecisionTree or RandomForest (with
maxBins=1900) it works just fine.

Seems like a bug in GradientBoostedTrees.

Suggestions?

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
800-733-2143


Storing data in MySQL from spark hive tables

2015-05-20 Thread roni
Hi ,
I am trying to setup the hive metastore and mysql DB connection.
 I have a spark cluster and I ran some programs and I have data stored in
some hive tables.
Now I want to store this data into Mysql  so that it is available for
further processing.

I setup the hive-site.xml file.








  

hive.semantic.analyzer.factory.impl

org.apache.hcatalog.cli.HCatSemanticAnalyzerFactory

  


  

hive.metastore.sasl.enabled

false

  


  

hive.server2.authentication

NONE

  


  

hive.server2.enable.doAs

true

  


  

hive.warehouse.subdir.inherit.perms

true

  


  

hive.metastore.schema.verification

false

  


  

javax.jdo.option.ConnectionURL

jdbc:mysql://<*ip address*
>:3306/metastore_db?createDatabaseIfNotExist=true

metadata is stored in a MySQL server

  


  

javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

MySQL JDBC driver class

  


  

javax.jdo.option.ConnectionUserName

root

  


  

javax.jdo.option.ConnectionPassword



  

  

hive.metastore.warehouse.dir

/user/${user.name}/hive-warehouse

location of default database for
the warehouse





 --
My mysql server is on a separate server than where my spark server is . If
I use mySQLWorkbench , I use a SSH connection  with a certificate file to
connect .
How do I specify all that information from spark to the DB ?
I want to store the data generated by my spark program into mysql.
Thanks
_R


Spark Application Dependency Issue

2015-05-20 Thread Snehal Nagmote
Hi All,

I am on spark 1.1 with Datastax DSE.

Application is Spark Streaming and have Couchbase dependencies which uses
http-core 4.3.2 .

While running application I get this error

This is the error I get

NoSuchMethodError:
org.apache.http.protocol.RequestUserAgent.(Ljava/lang/String;)V

at com.couchbase.client.ViewConnection.(ViewConnection.java:157) at
com.couchbase.client.CouchbaseConnectionFactory.createViewConnection(CouchbaseConnectionFactory.java:254)

at com.couchbase.client.CouchbaseClient.(CouchbaseClient.java:266)

at
com.walmart.platform.cache.CouchBaseFactoryImpl.create(CouchBaseFactoryImpl.java:76)

There are different versions of http-core dependencies in spark-classpath ,

http-core 4.1. 3 and http-core 4.2.4 . My application uses 4.3.2 .

I tried using user-classpath-first option but it does not work for me since
I am on spark 1.1.


Any help or pointers would be really useful ,


Thanks,

Snehal


Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Burak Yavuz
Could you please open a JIRA for it? The maxBins input is missing for the
Python Api.

Is it possible if you can use the current master? In the current master,
you should be able to use trees with the Pipeline Api and DataFrames.

Best,
Burak

On Wed, May 20, 2015 at 2:44 PM, Don Drake  wrote:

> I'm running Spark v1.3.1 and when I run the following against my dataset:
>
> model = GradientBoostedTrees.trainRegressor(trainingData,
> categoricalFeaturesInfo=catFeatu
> res, maxDepth=6, numIterations=3)
>
> The job will fail with the following message:
> Traceback (most recent call last):
>   File "/Users/drake/fd/spark/mltest.py", line 73, in 
> model = GradientBoostedTrees.trainRegressor(trainingData,
> categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
>   File
> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
> line 553, in trainRegressor
> loss, numIterations, learningRate, maxDepth)
>   File
> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
> line 438, in _train
> loss, numIterations, learningRate, maxDepth)
>   File
> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
> line 120, in callMLlibFunc
> return callJavaFunc(sc, api, *args)
>   File
> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
> line 113, in callJavaFunc
> return _java2py(sc, func(*args))
>   File
> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o69.trainGradientBoostedTreesModel.
> : java.lang.IllegalArgumentException: requirement failed: DecisionTree
> requires maxBins (= 32) >= max categories in categorical features (= 1895)
> at scala.Predef$.require(Predef.scala:233)
> at
> org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
> at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
> at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
> at
> org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
> at
> org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
> at
> org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
> at
> org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)
>
> So, it's complaining about the maxBins, if I provide maxBins=1900 and
> re-run it:
>
> model = GradientBoostedTrees.trainRegressor(trainingData,
> categoricalFeaturesInfo=catFeatu
> res, maxDepth=6, numIterations=3, maxBins=1900)
>
> Traceback (most recent call last):
>   File "/Users/drake/fd/spark/mltest.py", line 73, in 
> model = GradientBoostedTrees.trainRegressor(trainingData,
> categoricalFeaturesInfo=catF
> eatures, maxDepth=6, numIterations=3, maxBins=1900)
> TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'
>
> It now says it knows nothing of maxBins.
>
> If I run the same command against DecisionTree or RandomForest (with
> maxBins=1900) it works just fine.
>
> Seems like a bug in GradientBoostedTrees.
>
> Suggestions?
>
> -Don
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> 800-733-2143
>


Re: Spark 1.3.1 - SQL Issues

2015-05-20 Thread ayan guha
Thanks a bunch
On 21 May 2015 07:11, "Davies Liu"  wrote:

> The docs had been updated.
>
> You should convert the DataFrame to RDD by `df.rdd`
>
> On Mon, Apr 20, 2015 at 5:23 AM, ayan guha  wrote:
> > Hi
> > Just upgraded to Spark 1.3.1.
> >
> > I am getting an warning
> >
> > Warning (from warnings module):
> >   File
> >
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py",
> > line 191
> > warnings.warn("inferSchema is deprecated, please use createDataFrame
> > instead")
> > UserWarning: inferSchema is deprecated, please use createDataFrame
> instead
> >
> > However, documentation still says to use inferSchema.
> > Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
> > section
> >
> > Also, I am getting an error in mlib.ALS.train function when passing
> > dataframe (do I need to convert the DF to RDD?)
> >
> > Code:
> > training = ssc.sql("select userId,movieId,rating from ratings where
> > partitionKey < 6").cache()
> > print type(training)
> > model = ALS.train(training,rank,numIter,lmbda)
> >
> > Error:
> > 
> > Rank:8 Lmbda:1.0 iteration:10
> >
> > Traceback (most recent call last):
> >   File "D:\Project\Spark\code\movie_sql.py", line 109, in 
> > bestConf =
> getBestModel(sc,ssc,training,validation,validationNoRating)
> >   File "D:\Project\Spark\code\movie_sql.py", line 54, in getBestModel
> > model = ALS.train(trainingRDD,rank,numIter,lmbda)
> >   File
> >
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py",
> > line 139, in train
> > model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank,
> > iterations,
> >   File
> >
> "D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py",
> > line 127, in _prepare
> > assert isinstance(ratings, RDD), "ratings should be RDD"
> > AssertionError: ratings should be RDD
> >
> > --
> > Best Regards,
> > Ayan Guha
>


Re: Spark Job not using all nodes in cluster

2015-05-20 Thread Shailesh Birari
No. I am not setting the number of executors anywhere (in env file or in
program).

Is it due to large number of small files ?

On Wed, May 20, 2015 at 5:11 PM, ayan guha  wrote:

> What is your spark env file says? Are you setting number of executors in
> spark context?
> On 20 May 2015 13:16, "Shailesh Birari"  wrote:
>
>> Hi,
>>
>> I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB
>> of RAM.
>> I have around 600,000+ Json files on HDFS. Each file is small around 1KB
>> in
>> size. Total data is around 16GB. Hadoop block size is 256MB.
>> My application reads these files with sc.textFile() (or sc.jsonFile()
>> tried
>> both) API. But all the files are getting read by only one node (4
>> executors). Spark UI shows all 600K+ tasks on one node and 0 on other
>> nodes.
>>
>> I confirmed that all files are accessible from all nodes. Some other
>> application which uses big files uses all nodes on same cluster.
>>
>> Can you please let me know why it is behaving in such way ?
>>
>> Thanks,
>>   Shailesh
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-20 Thread Xin Liu
Hi,

I have tried a few models in Mllib to train a LogisticRegression model.
However, I consistently get much better results using other libraries such
as statsmodel (which gives similar results as R) in terms of AUC. For
illustration purpose, I used a small data (I have tried much bigger data)
 http://www.ats.ucla.edu/stat/data/binary.csv in
http://www.ats.ucla.edu/stat/r/dae/logit.htm

Here is the snippet of my usage of LogisticRegressionWithLBFGS.

val algorithm = new LogisticRegressionWithLBFGS
 algorithm.setIntercept(true)
 algorithm.optimizer
   .setNumIterations(100)
   .setRegParam(0.01)
   .setConvergenceTol(1e-5)
 val model = algorithm.run(training)
 model.clearThreshold()
 val scoreAndLabels = test.map { point =>
   val score = model.predict(point.features)
   (score, point.label)
 }
 val metrics = new BinaryClassificationMetrics(scoreAndLabels)
 val auROC = metrics.areaUnderROC()

I did a (0.6, 0.4) split for training/test. The response is "admit" and
features are "GRE score", "GPA", and "college Rank".

Spark:
Weights (GRE, GPA, Rank):
[0.0011576276331509304,0.048544858567336854,-0.394202150286076]
Intercept: -0.6488972641282202
Area under ROC: 0.6294070512820512

StatsModel:
Weights [0.0018, 0.7220, -0.3148]
Intercept: -3.5913
Area under ROC: 0.69

The weights from statsmodel seems more reasonable if you consider for a one
unit increase in gpa, the log odds of being admitted to graduate school
increases by 0.72 in statsmodel than 0.04 in Spark.

I have seen much bigger difference with other data. So my question is has
anyone compared the results with other libraries and is anything wrong with
my code to invoke LogisticRegressionWithLBFGS?

As the real data I am processing is pretty big and really want to use Spark
to get this to work. Please let me know if you have similar experience and
how you resolve it.

Thanks,
Xin


How to process data in chronological order

2015-05-20 Thread roy
I have a key-value RDD, key is a timestamp (femto-second resolution, so
grouping buys me nothing) and I want to reduce it in the chronological
order.

How do I do that in spark?

I am fine with reducing contiguous sections of the set separately and then
aggregating the resulting objects locally.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-data-in-chronological-order-tp22966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Storing data in MySQL from spark hive tables

2015-05-20 Thread Yana Kadiyska
I'm afraid you misunderstand the purpose of hive-site.xml. It configures
access to the Hive metastore. You can read more here:
http://www.hadoopmaterial.com/2013/11/metastore.html.

So the MySQL DB in hive-site.xml would be used to store hive-specific data
such as schema info, partition info, etc.

Now, for what you want to do, you can search the user list -- I know there
have been posts about Postgres but you can do the same with MySQL. The idea
is to create an object holding a connection pool (so each of your executors
would have its own instance), or alternately, to open a connection within
mapPartitions (so you don't end up with a ton of connections). But the
write to a DB is largely a manual process -- open a connection, create a
statement, sync the data. If your data is small enough you probably could
just collect on the driver and write...though that would certainly be
slower than writing in parallel from each executor.

On Wed, May 20, 2015 at 5:48 PM, roni  wrote:

> Hi ,
> I am trying to setup the hive metastore and mysql DB connection.
>  I have a spark cluster and I ran some programs and I have data stored in
> some hive tables.
> Now I want to store this data into Mysql  so that it is available for
> further processing.
>
> I setup the hive-site.xml file.
>
> 
>
> 
>
>
> 
>
>   
>
> hive.semantic.analyzer.factory.impl
>
> org.apache.hcatalog.cli.HCatSemanticAnalyzerFactory
>
>   
>
>
>   
>
> hive.metastore.sasl.enabled
>
> false
>
>   
>
>
>   
>
> hive.server2.authentication
>
> NONE
>
>   
>
>
>   
>
> hive.server2.enable.doAs
>
> true
>
>   
>
>
>   
>
> hive.warehouse.subdir.inherit.perms
>
> true
>
>   
>
>
>   
>
> hive.metastore.schema.verification
>
> false
>
>   
>
>
>   
>
> javax.jdo.option.ConnectionURL
>
> jdbc:mysql://<*ip address*
> >:3306/metastore_db?createDatabaseIfNotExist=true
>
> metadata is stored in a MySQL server
>
>   
>
>
>   
>
> javax.jdo.option.ConnectionDriverName
>
> com.mysql.jdbc.Driver
>
> MySQL JDBC driver class
>
>   
>
>
>   
>
> javax.jdo.option.ConnectionUserName
>
> root
>
>   
>
>
>   
>
> javax.jdo.option.ConnectionPassword
>
> 
>
>   
>
>   
>
> hive.metastore.warehouse.dir
>
> /user/${user.name}/hive-warehouse
>
> location of default database for
> the warehouse
>
> 
>
>
> 
>  --
> My mysql server is on a separate server than where my spark server is . If
> I use mySQLWorkbench , I use a SSH connection  with a certificate file to
> connect .
> How do I specify all that information from spark to the DB ?
> I want to store the data generated by my spark program into mysql.
> Thanks
> _R
>


Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Joseph Bradley
One more comment: That's a lot of categories for a feature.  If it makes
sense for your data, it will run faster if you can group the categories or
split the 1895 categories into a few features which have fewer categories.

On Wed, May 20, 2015 at 3:17 PM, Burak Yavuz  wrote:

> Could you please open a JIRA for it? The maxBins input is missing for the
> Python Api.
>
> Is it possible if you can use the current master? In the current master,
> you should be able to use trees with the Pipeline Api and DataFrames.
>
> Best,
> Burak
>
> On Wed, May 20, 2015 at 2:44 PM, Don Drake  wrote:
>
>> I'm running Spark v1.3.1 and when I run the following against my dataset:
>>
>> model = GradientBoostedTrees.trainRegressor(trainingData,
>> categoricalFeaturesInfo=catFeatu
>> res, maxDepth=6, numIterations=3)
>>
>> The job will fail with the following message:
>> Traceback (most recent call last):
>>   File "/Users/drake/fd/spark/mltest.py", line 73, in 
>> model = GradientBoostedTrees.trainRegressor(trainingData,
>> categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
>>   File
>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
>> line 553, in trainRegressor
>> loss, numIterations, learningRate, maxDepth)
>>   File
>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
>> line 438, in _train
>> loss, numIterations, learningRate, maxDepth)
>>   File
>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
>> line 120, in callMLlibFunc
>> return callJavaFunc(sc, api, *args)
>>   File
>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
>> line 113, in callJavaFunc
>> return _java2py(sc, func(*args))
>>   File
>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o69.trainGradientBoostedTreesModel.
>> : java.lang.IllegalArgumentException: requirement failed: DecisionTree
>> requires maxBins (= 32) >= max categories in categorical features (= 1895)
>> at scala.Predef$.require(Predef.scala:233)
>> at
>> org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
>> at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
>> at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
>> at
>> org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
>> at
>> org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
>> at
>> org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
>> at
>> org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)
>>
>> So, it's complaining about the maxBins, if I provide maxBins=1900 and
>> re-run it:
>>
>> model = GradientBoostedTrees.trainRegressor(trainingData,
>> categoricalFeaturesInfo=catFeatu
>> res, maxDepth=6, numIterations=3, maxBins=1900)
>>
>> Traceback (most recent call last):
>>   File "/Users/drake/fd/spark/mltest.py", line 73, in 
>> model = GradientBoostedTrees.trainRegressor(trainingData,
>> categoricalFeaturesInfo=catF
>> eatures, maxDepth=6, numIterations=3, maxBins=1900)
>> TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'
>>
>> It now says it knows nothing of maxBins.
>>
>> If I run the same command against DecisionTree or RandomForest (with
>> maxBins=1900) it works just fine.
>>
>> Seems like a bug in GradientBoostedTrees.
>>
>> Suggestions?
>>
>> -Don
>>
>> --
>> Donald Drake
>> Drake Consulting
>> http://www.drakeconsulting.com/
>> 800-733-2143
>>
>
>


Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-20 Thread Joseph Bradley
Hi Xin,

2 suggestions:

1) Feature scaling: spark.mllib's LogisticRegressionWithLBFGS uses feature
scaling, which scales feature values to have unit standard deviation.  That
improves optimization behavior, and it often improves statistical
estimation (though maybe not for your dataset).  However, it effectively
changes the model being learned, so you should expect different results
from other libraries like R.  You could instead use LogisticRegressionWithSGD,
which does not do feature scaling.  With SGD, you may need to play around
with the stepSize more to get it to converge, but it should be able to
learn exactly the same model as R.

2) Convergence: I'd do a sanity check and make sure the algorithm is
converging.  (Compare with running for more iterations or using a lower
convergenceTol.)

Note: If you can use the Spark master branch (or wait for Spark 1.4), then
the spark.ml Pipelines API will be a good option.  It now has
LogisticRegression which does not do feature scaling, and it uses LBFGS or
OWLQN (depending on the regularization type) for optimization.  It's also
been compared with R in unit tests.

Good luck!
Joseph

On Wed, May 20, 2015 at 3:42 PM, Xin Liu  wrote:

> Hi,
>
> I have tried a few models in Mllib to train a LogisticRegression model.
> However, I consistently get much better results using other libraries such
> as statsmodel (which gives similar results as R) in terms of AUC. For
> illustration purpose, I used a small data (I have tried much bigger data)
>  http://www.ats.ucla.edu/stat/data/binary.csv in
> http://www.ats.ucla.edu/stat/r/dae/logit.htm
>
> Here is the snippet of my usage of LogisticRegressionWithLBFGS.
>
> val algorithm = new LogisticRegressionWithLBFGS
>  algorithm.setIntercept(true)
>  algorithm.optimizer
>.setNumIterations(100)
>.setRegParam(0.01)
>.setConvergenceTol(1e-5)
>  val model = algorithm.run(training)
>  model.clearThreshold()
>  val scoreAndLabels = test.map { point =>
>val score = model.predict(point.features)
>(score, point.label)
>  }
>  val metrics = new BinaryClassificationMetrics(scoreAndLabels)
>  val auROC = metrics.areaUnderROC()
>
> I did a (0.6, 0.4) split for training/test. The response is "admit" and
> features are "GRE score", "GPA", and "college Rank".
>
> Spark:
> Weights (GRE, GPA, Rank):
> [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
> Intercept: -0.6488972641282202
> Area under ROC: 0.6294070512820512
>
> StatsModel:
> Weights [0.0018, 0.7220, -0.3148]
> Intercept: -3.5913
> Area under ROC: 0.69
>
> The weights from statsmodel seems more reasonable if you consider for a
> one unit increase in gpa, the log odds of being admitted to graduate school
> increases by 0.72 in statsmodel than 0.04 in Spark.
>
> I have seen much bigger difference with other data. So my question is has
> anyone compared the results with other libraries and is anything wrong with
> my code to invoke LogisticRegressionWithLBFGS?
>
> As the real data I am processing is pretty big and really want to use
> Spark to get this to work. Please let me know if you have similar
> experience and how you resolve it.
>
> Thanks,
> Xin
>


Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-20 Thread DB Tsai
Hi Xin,

If you take a look at the model you trained, the intercept from Spark
is significantly smaller than StatsModel, and the intercept represents
a prior on categories in LOR which causes the low accuracy in Spark
implementation. In LogisticRegressionWithLBFGS, the intercept is
regularized due to the implementation of Updater, and the intercept
should not be regularized.

In the new pipleline APIs, a LOR with elasticNet is implemented, and
the intercept is properly handled.
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala

As you can see the tests,
https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
the result is exactly the same as R now.

BTW, in both version, the feature scalings are done before training,
and we train the model in scaled space but transform the model weights
back to original space. The only difference is in the mllib version,
LogisticRegressionWithLBFGS regularizes the intercept while in the ml
version, the intercept is excluded from regularization. As a result,
if lambda is zero, the model should be the same.



On Wed, May 20, 2015 at 3:42 PM, Xin Liu  wrote:
> Hi,
>
> I have tried a few models in Mllib to train a LogisticRegression model.
> However, I consistently get much better results using other libraries such
> as statsmodel (which gives similar results as R) in terms of AUC. For
> illustration purpose, I used a small data (I have tried much bigger data)
>  http://www.ats.ucla.edu/stat/data/binary.csv in
> http://www.ats.ucla.edu/stat/r/dae/logit.htm
>
> Here is the snippet of my usage of LogisticRegressionWithLBFGS.
>
> val algorithm = new LogisticRegressionWithLBFGS
>  algorithm.setIntercept(true)
>  algorithm.optimizer
>.setNumIterations(100)
>.setRegParam(0.01)
>.setConvergenceTol(1e-5)
>  val model = algorithm.run(training)
>  model.clearThreshold()
>  val scoreAndLabels = test.map { point =>
>val score = model.predict(point.features)
>(score, point.label)
>  }
>  val metrics = new BinaryClassificationMetrics(scoreAndLabels)
>  val auROC = metrics.areaUnderROC()
>
> I did a (0.6, 0.4) split for training/test. The response is "admit" and
> features are "GRE score", "GPA", and "college Rank".
>
> Spark:
> Weights (GRE, GPA, Rank):
> [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
> Intercept: -0.6488972641282202
> Area under ROC: 0.6294070512820512
>
> StatsModel:
> Weights [0.0018, 0.7220, -0.3148]
> Intercept: -3.5913
> Area under ROC: 0.69
>
> The weights from statsmodel seems more reasonable if you consider for a one
> unit increase in gpa, the log odds of being admitted to graduate school
> increases by 0.72 in statsmodel than 0.04 in Spark.
>
> I have seen much bigger difference with other data. So my question is has
> anyone compared the results with other libraries and is anything wrong with
> my code to invoke LogisticRegressionWithLBFGS?
>
> As the real data I am processing is pretty big and really want to use Spark
> to get this to work. Please let me know if you have similar experience and
> how you resolve it.
>
> Thanks,
> Xin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spatial function in spark

2015-05-20 Thread developer developer
Hello ,
i am fairly new to spark and python programming. I have an RDD with
 polygons, i need to perform spatial joins , geohash calculations and other
spatial operations on these RDDs parallelly.

I run spark jobs on yarn cluster, and develop spark applications in python.

So, can u please suggest some pointers on how to enable spatial support for
spark applications ?

Thanks !


Help needed with Py4J

2015-05-20 Thread Addanki, Santosh Kumar
Hi Colleagues

We need to call a Scala Class from pySpark in Ipython notebook.

We tried something like below :

from py4j.java_gateway import java_import

java_import(sparkContext._jvm,'')

myScalaClass =  sparkContext._jvm.SimpleScalaClass ()

myScalaClass.sayHello("World") Works Fine

But

When we try to pass sparkContext to our class it fails  like below

myContext  = _jvm.MySQLContext(sparkContext) fails with


AttributeErrorTraceback (most recent call last)

 in ()

> 1 z = _jvm.MySQLContext(sparkContext)



C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in 
__call__(self, *args)

690

691 args_command = ''.join(

--> 692 [get_command_part(arg, self._pool) for arg in new_args])

693

694 command = CONSTRUCTOR_COMMAND_NAME +\



C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in 
get_command_part(parameter, python_proxy_pool)

263 command_part += ';' + interface

264 else:

--> 265 command_part = REFERENCE_TYPE + parameter._get_object_id()

266

267 command_part += '\n'
attributeError: 'SparkContext' object has no attribute '_get_object_id'




And

myContext  = _jvm.MySQLContext(sparkContext._jsc) fails with


Constructor org.apache.spark.sql.MySQLContext([class 
org.apache.spark.api.java.JavaSparkContext]) does not exist





Would this be possible ... or there are serialization issues and hence not 
possible.

If not what are the options we have to instantiate our own SQLContext written 
in scala from pySpark...



Best Regards,

Santosh






Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-20 Thread Fengyun RAO
Thank you so much, Marcelo!

It WORKS!

2015-05-21 2:05 GMT+08:00 Marcelo Vanzin :

> Hello,
>
> Sorry for the delay. The issue you're running into is because most HBase
> classes are in the system class path, while jars added with "--jars" are
> only visible to the application class loader created by Spark. So classes
> in the system class path cannot see them.
>
> You can work around this by setting "--driver-classpath
> /opt/.../htrace-core-3.1.0-incubating.jar" and "--conf
> spark.executor.extraClassPath=
> /opt/.../htrace-core-3.1.0-incubating.jar" in your spark-submit command
> line. (You can also add those configs to your spark-defaults.conf to avoid
> having to type them all the time; and don't forget to include any other
> jars that might be needed.)
>
>
> On Mon, May 18, 2015 at 11:14 PM, Fengyun RAO 
> wrote:
>
>> Thanks, Marcelo!
>>
>>
>> Below is the full log,
>>
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in 
>> [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in 
>> [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/05/19 14:08:58 INFO yarn.ApplicationMaster: Registered signal handlers 
>> for [TERM, HUP, INT]
>> 15/05/19 14:08:59 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
>> appattempt_1432015548391_0003_01
>> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: 
>> nobody,raofengyun
>> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: 
>> nobody,raofengyun
>> 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: 
>> authentication disabled; ui acls disabled; users with view permissions: 
>> Set(nobody, raofengyun); users with modify permissions: Set(nobody, 
>> raofengyun)
>> 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Starting the user application 
>> in a separate Thread
>> 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context 
>> initialization
>> 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context 
>> initialization ...
>> 15/05/19 14:09:00 INFO spark.SparkContext: Running Spark version 1.3.0
>> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: 
>> nobody,raofengyun
>> 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: 
>> nobody,raofengyun
>> 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: 
>> authentication disabled; ui acls disabled; users with view permissions: 
>> Set(nobody, raofengyun); users with modify permissions: Set(nobody, 
>> raofengyun)
>> 15/05/19 14:09:01 INFO slf4j.Slf4jLogger: Slf4jLogger started
>> 15/05/19 14:09:01 INFO Remoting: Starting remoting
>> 15/05/19 14:09:01 INFO Remoting: Remoting started; listening on addresses 
>> :[akka.tcp://sparkDriver@gs-server-v-127:7191]
>> 15/05/19 14:09:01 INFO Remoting: Remoting now listens on addresses: 
>> [akka.tcp://sparkDriver@gs-server-v-127:7191]
>> 15/05/19 14:09:01 INFO util.Utils: Successfully started service 
>> 'sparkDriver' on port 7191.
>> 15/05/19 14:09:01 INFO spark.SparkEnv: Registering MapOutputTracker
>> 15/05/19 14:09:01 INFO spark.SparkEnv: Registering BlockManagerMaster
>> 15/05/19 14:09:01 INFO storage.DiskBlockManager: Created local directory at 
>> /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/blockmgr-3250910b-693e-46ff-b057-26d552fd8abd
>> 15/05/19 14:09:01 INFO storage.MemoryStore: MemoryStore started with 
>> capacity 259.7 MB
>> 15/05/19 14:09:01 INFO spark.HttpFileServer: HTTP File server directory is 
>> /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/httpd-5bc614bc-d8b1-473d-a807-4d9252eb679d
>> 15/05/19 14:09:01 INFO spark.HttpServer: Starting HTTP Server
>> 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/05/19 14:09:01 INFO server.AbstractConnector: Started 
>> SocketConnector@0.0.0.0:9349
>> 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'HTTP file 
>> server' on port 9349.
>> 15/05/19 14:09:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator
>> 15/05/19 14:09:01 INFO ui.JettyUtils: Adding filter: 
>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>> 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/05/19 14:09:01 INFO server.AbstractConnector: Started 
>> SelectChannelConnector@0.0.0.0:63023
>> 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'SparkUI' on 
>> port 63023.
>> 15/05/19 14:09:01 INFO ui.SparkUI: Started SparkUI at 
>> http://gs-server-v-127:63023
>> 15/05/19 14:09:02 INFO cluster.YarnClusterScheduler: Created 
>> YarnClusterScheduler
>> 15/05/19 14:09:02 INFO netty.NettyBlockTransferService:

Re: Help needed with Py4J

2015-05-20 Thread Holden Karau
Are your jars included in both the driver and worker class paths?

On Wednesday, May 20, 2015, Addanki, Santosh Kumar <
santosh.kumar.adda...@sap.com> wrote:

>  Hi Colleagues
>
>
>
> We need to call a Scala Class from pySpark in Ipython notebook.
>
>
>
> We tried something like below :
>
>
>
> from py4j.java_gateway import java_import
>
>
>
> java_import(sparkContext._jvm,'')
>
>
>
> myScalaClass =  sparkContext._jvm.SimpleScalaClass ()
>
>
>
> myScalaClass.sayHello(“World”) Works Fine
>
>
>
> But
>
>
>
> When we try to pass sparkContext to our class it fails  like below
>
>
>
> myContext  = _jvm.MySQLContext(sparkContext) fails with
>
>
>
> AttributeErrorTraceback (most recent call last)
>
>  in ()
>
> > 1 z = _jvm.MySQLContext(sparkContext)
>
>
>
> C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py 
> in __call__(self, *args)
>
> 690
>
> 691 args_command = ''.join(
>
> --> 692 [get_command_part(arg, self._pool) for arg in 
> new_args])
>
> 693
>
> 694 command = CONSTRUCTOR_COMMAND_NAME +\
>
>
>
> C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in 
> get_command_part(parameter, python_proxy_pool)
>
> 263 command_part += ';' + interface
>
> 264 else:
>
> --> 265 command_part = REFERENCE_TYPE + parameter._get_object_id()
>
> 266
>
> 267 command_part += '\n'
>
>  attributeError: 'SparkContext' object has no attribute '_get_object_id'
>
>
>
>
>
>
>
> And
>
>
>
> myContext  = _*jvm.MySQLContext(sparkContext.*_jsc) fails with
>
>
>
> Constructor org.apache.spark.sql.MySQLContext([class 
> org.apache.spark.api.java.JavaSparkContext]) does not exist
>
>
>
>
>
> Would this be possible … or there are serialization issues and hence not 
> possible.
>
> If not what are the options we have to instantiate our own SQLContext written 
> in scala from pySpark…
>
>
>
> Best Regards,
>
> Santosh
>
>
>
>
>
>
>
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


Re: Help needed with Py4J

2015-05-20 Thread Addanki, Santosh Kumar
Yeah ... I am able to instantiate the simple scala class as explained below 
which is from the same JAR

Regards
Santosh


On May 20, 2015, at 7:26 PM, Holden Karau 
mailto:hol...@pigscanfly.ca>> wrote:

Are your jars included in both the driver and worker class paths?

On Wednesday, May 20, 2015, Addanki, Santosh Kumar 
mailto:santosh.kumar.adda...@sap.com>> wrote:
Hi Colleagues

We need to call a Scala Class from pySpark in Ipython notebook.

We tried something like below :

from py4j.java_gateway import java_import

java_import(sparkContext._jvm,'')

myScalaClass =  sparkContext._jvm.SimpleScalaClass ()

myScalaClass.sayHello(“World”) Works Fine

But

When we try to pass sparkContext to our class it fails  like below

myContext  = _jvm.MySQLContext(sparkContext) fails with


AttributeErrorTraceback (most recent call last)

 in ()

> 1 z = _jvm.MySQLContext(sparkContext)



C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in 
__call__(self, *args)

690

691 args_command = ''.join(

--> 692 [get_command_part(arg, self._pool) for arg in new_args])

693

694 command = CONSTRUCTOR_COMMAND_NAME +\



C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in 
get_command_part(parameter, python_proxy_pool)

263 command_part += ';' + interface

264 else:

--> 265 command_part = REFERENCE_TYPE + parameter._get_object_id()

266

267 command_part += '\n'
attributeError: 'SparkContext' object has no attribute '_get_object_id'




And

myContext  = _jvm.MySQLContext(sparkContext._jsc) fails with


Constructor org.apache.spark.sql.MySQLContext([class 
org.apache.spark.api.java.JavaSparkContext]) does not exist





Would this be possible … or there are serialization issues and hence not 
possible.

If not what are the options we have to instantiate our own SQLContext written 
in scala from pySpark…



Best Regards,

Santosh






--
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau



Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-20 Thread Tathagata Das
If you are talking about handling driver crash failures, then all bets are
off anyways! Adding a shutdown hook in the hope of handling driver process
failure, handles only a some cases (Ctrl-C), but does not handle cases like
SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its
not a good idea to rely on that.

Nonetheless I have opened a PR to handle the shutdown of the
StreamigntContext in the same way as SparkContext.
https://github.com/apache/spark/pull/6307


On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Thenka Sean . you are right. If driver program is running then I can
> handle shutdown in main exit path  . But if Driver machine is crashed (if
> you just stop the application, for example killing the driver process ),
> then Shutdownhook is the only option isn't it ? What I try to say is , just
> doing ssc.stop in  sys.ShutdownHookThread  or
>  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
> to use the Utils.addShutdownHook with a priority .. So just checking if
> Spark Streaming can make graceful shutdown as default shutdown mechanism.
>
> Dibyendu
>
> On Tue, May 19, 2015 at 1:03 PM, Sean Owen  wrote:
>
>> I don't think you should rely on a shutdown hook. Ideally you try to
>> stop it in the main exit path of your program, even in case of an
>> exception.
>>
>> On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
>>  wrote:
>> > You mean to say within Runtime.getRuntime().addShutdownHook I call
>> > ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
>> >
>> > This won't work anymore in 1.4.
>> >
>> > The SparkContext got stopped before Receiver processed all received
>> blocks
>> > and I see below exception in logs. But if I add the
>> Utils.addShutdownHook
>> > with the priority as I mentioned , then only graceful shutdown works .
>> In
>> > that case shutdown-hook run in priority order.
>> >
>>
>
>


Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed

2015-05-20 Thread Tathagata Das
Has this been fixed for you now? There has been a number of patches since
then and it may have been fixed.

On Thu, May 14, 2015 at 7:20 AM, Wangfei (X)  wrote:

>  Yes it is repeatedly on my locally Jenkins.
>
> 发自我的 iPhone
>
> 在 2015年5月14日,18:30,"Tathagata Das"  写道:
>
>   Do you get this failure repeatedly?
>
>
>
> On Thu, May 14, 2015 at 12:55 AM, kf  wrote:
>
>> Hi, all, i got following error when i run unit test of spark by
>> dev/run-tests
>> on the latest "branch-1.4" branch.
>>
>> the latest commit id:
>> commit d518c0369fa412567855980c3f0f426cde5c190d
>> Author: zsxwing 
>> Date:   Wed May 13 17:58:29 2015 -0700
>>
>> error
>>
>> [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started
>> [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed:
>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>> [error] at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
>> [error] at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119)
>> [error] at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>> [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
>> [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577)
>> [error] at
>>
>> org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626)
>> [error] at
>>
>> org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597)
>> [error] at
>>
>> org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403)
>> [error] at
>>
>> org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102)
>> [error] at
>>
>> org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344)
>> [error] at
>>
>> org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
>> [error] at
>>
>> org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74)
>> [error] at
>>
>> org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
>> [error] at
>> org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala)
>> [error] at
>> org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103)
>> [error] ...
>> [error] Caused by: org.apache.spark.SparkException: Error sending message
>> [message = StopMapOutputTracker]
>> [error] at
>> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>> [error] at
>> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
>> [error] at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109)
>> [error] ... 52 more
>> [error] Caused by: java.util.concurrent.TimeoutException: Futures timed
>> out
>> after [120 seconds]
>> [error] at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> [error] at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> [error] at
>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> [error] at
>>
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> [error] at scala.concurrent.Await$.result(package.scala:107)
>> [error] at
>> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>> [error] ... 54 more
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Spark build with Hive

2015-05-20 Thread guoqing0...@yahoo.com.hk
Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 
can only build with 0.13 , 0.12 according to the document .

# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver 
-DskipTests clean package
# Apache Hadoop 2.4.X with Hive 12 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 
-Phive-thriftserver -DskipTests clean package



guoqing0...@yahoo.com.hk


Re: Spark build with Hive

2015-05-20 Thread Ted Yu
I am afraid even Hive 1.0 is not supported, let alone Hive 1.2

Cheers

On Wed, May 20, 2015 at 8:08 PM, guoqing0...@yahoo.com.hk <
guoqing0...@yahoo.com.hk> wrote:

> Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to
> Spark-1.3.1 can only build with 0.13 , 0.12 according to the document .
>
> # Apache Hadoop 2.4.X with Hive 13 support
> mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver 
> -DskipTests clean package# Apache Hadoop 2.4.X with Hive 12 support
> mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 
> -Phive-thriftserver -DskipTests clean package
>
>
> --
> guoqing0...@yahoo.com.hk
>


RE: Spark build with Hive

2015-05-20 Thread Cheng, Hao
Yes, ONLY support 0.12.0 and 0.13.1 currently. Hopefully we can support higher 
versions in next 1 or 2 releases.

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Thursday, May 21, 2015 11:12 AM
To: guoqing0...@yahoo.com.hk
Cc: user
Subject: Re: Spark build with Hive

I am afraid even Hive 1.0 is not supported, let alone Hive 1.2

Cheers

On Wed, May 20, 2015 at 8:08 PM, 
guoqing0...@yahoo.com.hk 
mailto:guoqing0...@yahoo.com.hk>> wrote:
Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 
can only build with 0.13 , 0.12 according to the document .


# Apache Hadoop 2.4.X with Hive 13 support

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver 
-DskipTests clean package

# Apache Hadoop 2.4.X with Hive 12 support

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 
-Phive-thriftserver -DskipTests clean package


guoqing0...@yahoo.com.hk



Re: Help needed with Py4J

2015-05-20 Thread Holden Karau
Ah sorry, I missed that part (I've been dealing with some py4j stuff today
as well and maybe skimmed it a bit too quickly). Do you have your code
somewhere I could take a look at? Also does your constructor expect a
JavaSparkContext or a regular SparkContext (if you look at how the
SQLContext is constructed in python its done using a regular SparkContext,
so _jsc.sc() is used).

On Wed, May 20, 2015 at 7:32 PM, Addanki, Santosh Kumar <
santosh.kumar.adda...@sap.com> wrote:

>  Yeah ... I am able to instantiate the simple scala class as explained
> below which is from the same JAR
>
>  Regards
> Santosh
>
>
> On May 20, 2015, at 7:26 PM, Holden Karau  wrote:
>
>  Are your jars included in both the driver and worker class paths?
>
> On Wednesday, May 20, 2015, Addanki, Santosh Kumar <
> santosh.kumar.adda...@sap.com> wrote:
>
>>  Hi Colleagues
>>
>>
>>
>> We need to call a Scala Class from pySpark in Ipython notebook.
>>
>>
>>
>> We tried something like below :
>>
>>
>>
>> from py4j.java_gateway import java_import
>>
>>
>>
>> java_import(sparkContext._jvm,'')
>>
>>
>>
>> myScalaClass =  sparkContext._jvm.SimpleScalaClass ()
>>
>>
>>
>> myScalaClass.sayHello(“World”) Works Fine
>>
>>
>>
>> But
>>
>>
>>
>> When we try to pass sparkContext to our class it fails  like below
>>
>>
>>
>> myContext  = _jvm.MySQLContext(sparkContext) fails with
>>
>>
>>
>> AttributeErrorTraceback (most recent call last)
>>
>>  in ()
>>
>> > 1 z = _jvm.MySQLContext(sparkContext)
>>
>>
>>
>> C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py 
>> in __call__(self, *args)
>>
>> 690
>>
>> 691 args_command = ''.join(
>>
>> --> 692 [get_command_part(arg, self._pool) for arg in 
>> new_args])
>>
>> 693
>>
>> 694 command = CONSTRUCTOR_COMMAND_NAME +\
>>
>>
>>
>> C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in 
>> get_command_part(parameter, python_proxy_pool)
>>
>> 263 command_part += ';' + interface
>>
>> 264 else:
>>
>> --> 265 command_part = REFERENCE_TYPE + parameter._get_object_id()
>>
>> 266
>>
>> 267 command_part += '\n'
>>
>>  attributeError: 'SparkContext' object has no attribute '_get_object_id'
>>
>>
>>
>>
>>
>>
>>
>> And
>>
>>
>>
>> myContext  = _*jvm.MySQLContext(sparkContext.*_jsc) fails with
>>
>>
>>
>> Constructor org.apache.spark.sql.MySQLContext([class 
>> org.apache.spark.api.java.JavaSparkContext]) does not exist
>>
>>
>>
>>
>>
>> Would this be possible … or there are serialization issues and hence not 
>> possible.
>>
>> If not what are the options we have to instantiate our own SQLContext 
>> written in scala from pySpark…
>>
>>
>>
>> Best Regards,
>>
>> Santosh
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
> --
>  Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Don Drake
JIRA created: https://issues.apache.org/jira/browse/SPARK-7781

Joseph, I agree, I'm debating removing this feature altogether, but I'm
putting the model through its paces.

Thanks.

-Don

On Wed, May 20, 2015 at 7:52 PM, Joseph Bradley 
wrote:

> One more comment: That's a lot of categories for a feature.  If it makes
> sense for your data, it will run faster if you can group the categories or
> split the 1895 categories into a few features which have fewer categories.
>
> On Wed, May 20, 2015 at 3:17 PM, Burak Yavuz  wrote:
>
>> Could you please open a JIRA for it? The maxBins input is missing for the
>> Python Api.
>>
>> Is it possible if you can use the current master? In the current master,
>> you should be able to use trees with the Pipeline Api and DataFrames.
>>
>> Best,
>> Burak
>>
>> On Wed, May 20, 2015 at 2:44 PM, Don Drake  wrote:
>>
>>> I'm running Spark v1.3.1 and when I run the following against my dataset:
>>>
>>> model = GradientBoostedTrees.trainRegressor(trainingData,
>>> categoricalFeaturesInfo=catFeatu
>>> res, maxDepth=6, numIterations=3)
>>>
>>> The job will fail with the following message:
>>> Traceback (most recent call last):
>>>   File "/Users/drake/fd/spark/mltest.py", line 73, in 
>>> model = GradientBoostedTrees.trainRegressor(trainingData,
>>> categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
>>>   File
>>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
>>> line 553, in trainRegressor
>>> loss, numIterations, learningRate, maxDepth)
>>>   File
>>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py",
>>> line 438, in _train
>>> loss, numIterations, learningRate, maxDepth)
>>>   File
>>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
>>> line 120, in callMLlibFunc
>>> return callJavaFunc(sc, api, *args)
>>>   File
>>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py",
>>> line 113, in callJavaFunc
>>> return _java2py(sc, func(*args))
>>>   File
>>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>>> line 538, in __call__
>>>   File
>>> "/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>> line 300, in get_return_value
>>> 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o69.trainGradientBoostedTreesModel.
>>> : java.lang.IllegalArgumentException: requirement failed: DecisionTree
>>> requires maxBins (= 32) >= max categories in categorical features (= 1895)
>>> at scala.Predef$.require(Predef.scala:233)
>>> at
>>> org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
>>> at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
>>> at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
>>> at
>>> org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
>>> at
>>> org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
>>> at
>>> org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
>>> at
>>> org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)
>>>
>>> So, it's complaining about the maxBins, if I provide maxBins=1900 and
>>> re-run it:
>>>
>>> model = GradientBoostedTrees.trainRegressor(trainingData,
>>> categoricalFeaturesInfo=catFeatu
>>> res, maxDepth=6, numIterations=3, maxBins=1900)
>>>
>>> Traceback (most recent call last):
>>>   File "/Users/drake/fd/spark/mltest.py", line 73, in 
>>> model = GradientBoostedTrees.trainRegressor(trainingData,
>>> categoricalFeaturesInfo=catF
>>> eatures, maxDepth=6, numIterations=3, maxBins=1900)
>>> TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'
>>>
>>> It now says it knows nothing of maxBins.
>>>
>>> If I run the same command against DecisionTree or RandomForest (with
>>> maxBins=1900) it works just fine.
>>>
>>> Seems like a bug in GradientBoostedTrees.
>>>
>>> Suggestions?
>>>
>>> -Don
>>>
>>> --
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> 800-733-2143
>>>
>>
>>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
http://www.DrudgeSiren.com/
http://plu.gd/
800-733-2143


Re: RE: Spark build with Hive

2015-05-20 Thread guoqing0...@yahoo.com.hk
Thanks very much , Which version will be support In the upcome 1.4 ?  I hope it 
will be support more versions.



guoqing0...@yahoo.com.hk
 
From: Cheng, Hao
Date: 2015-05-21 11:20
To: Ted Yu; guoqing0...@yahoo.com.hk
CC: user
Subject: RE: Spark build with Hive
Yes, ONLY support 0.12.0 and 0.13.1 currently. Hopefully we can support higher 
versions in next 1 or 2 releases.
 
From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Thursday, May 21, 2015 11:12 AM
To: guoqing0...@yahoo.com.hk
Cc: user
Subject: Re: Spark build with Hive
 
I am afraid even Hive 1.0 is not supported, let alone Hive 1.2
 
Cheers
 
On Wed, May 20, 2015 at 8:08 PM, guoqing0...@yahoo.com.hk 
 wrote:
Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 
can only build with 0.13 , 0.12 according to the document .
 
# Apache Hadoop 2.4.X with Hive 13 supportmvn -Pyarn -Phadoop-2.4 
-Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package# 
Apache Hadoop 2.4.X with Hive 12 supportmvn -Pyarn -Phadoop-2.4 
-Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests 
clean package
 


guoqing0...@yahoo.com.hk
 


RE: RE: Spark build with Hive

2015-05-20 Thread Wang, Daoyuan
In 1.4 I think we still only support 0.12.0 and 0.13.1.

From: guoqing0...@yahoo.com.hk [mailto:guoqing0...@yahoo.com.hk]
Sent: Thursday, May 21, 2015 12:03 PM
To: Cheng, Hao; Ted Yu
Cc: user
Subject: Re: RE: Spark build with Hive

Thanks very much , Which version will be support In the upcome 1.4 ?  I hope it 
will be support more versions.


guoqing0...@yahoo.com.hk

From: Cheng, Hao
Date: 2015-05-21 11:20
To: Ted Yu; 
guoqing0...@yahoo.com.hk
CC: user
Subject: RE: Spark build with Hive
Yes, ONLY support 0.12.0 and 0.13.1 currently. Hopefully we can support higher 
versions in next 1 or 2 releases.

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Thursday, May 21, 2015 11:12 AM
To: guoqing0...@yahoo.com.hk
Cc: user
Subject: Re: Spark build with Hive

I am afraid even Hive 1.0 is not supported, let alone Hive 1.2

Cheers

On Wed, May 20, 2015 at 8:08 PM, 
guoqing0...@yahoo.com.hk 
mailto:guoqing0...@yahoo.com.hk>> wrote:
Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 
can only build with 0.13 , 0.12 according to the document .


# Apache Hadoop 2.4.X with Hive 13 support

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver 
-DskipTests clean package

# Apache Hadoop 2.4.X with Hive 12 support

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 
-Phive-thriftserver -DskipTests clean package


guoqing0...@yahoo.com.hk



Cannot submit SparkPi to Standalone (1.3.1) running on another Server (Both Linux)

2015-05-20 Thread Carey Sublette
I am attempting to submit a job (using SparkPi) from one Linux machine
(Ubuntu 14.04) to Spark 1.3.1 running in standalone mode on another Linux
machine (Xubuntu 12.04; spartacus.servile.war), but I cannot make a
connection.

I have investigated everything I can think of to diagnose/fix the problem
but have run out of ideas.

Here are the facts;
On the Xubuntu machine I can submit SparkPi without a problem. I can also
test successfully that the master is listening on port 7077 by connecting
with Telnet.
 Netstat shows:
tcp6   0  0 spartacus.servile.war:7077 [::]:*
 LISTEN
Iptables is not running, it is not even installed.
I have log4j set to log in DEBUG mode to a file.

On the Ubuntu client machine I can view the Spark Master web page at port
8080:
http://spartacus:8080/
I can of course telnet to port 8080 on spartacus as well. If I try to
telnet to port 7077 I get "connection refused".

If I try to submit SparkPI on this machine like so:

./bin/spark-submit   --class org.apache.spark.examples.SparkPi   --master
spark://spartacus.servile.war:7077   --executor-memory 10G
--total-executor-cores 8
/home/carey/dev/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar
  1

I get the following messages:
15/05/20 13:38:19 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkmas...@spartacus.servile.war:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkmas...@spartacus.servile.war:7077
15/05/20 13:38:19 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkmas...@spartacus.servile.war:7077]. Address is now
gated for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: spartacus.servile.war/
192.168.0.113:7077

Using "spartacus" or "192.168.0.113" instead of "spartacus.servile.war"
makes no difference.

Absolutely nothing shows up in the Spark log on spartacus when I try to
submit, I just see the worker heartbeat exchange.

In my hosts file on this machine I have:
192.168.0.113 spartacus.servile.war spartacus

Using the default spark-env.sh or setting:
export SPARK_MASTER_IP=spartacus.servile.war
(or just spartacus, or 192.168.0.113) makes no difference.

I have tried each combination of host ID in the submit and in the
spark-env.sh file together (3x4 = 12 combinations) with the same result
each time.

Iptables is not running on the Ubuntu machine either.

What is it I am missing?


Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-20 Thread Chris Gore
I tried running this data set as described with my own implementation of L2 
regularized logistic regression using LBFGS to compare:
https://github.com/cdgore/fitbox 

Intercept: -0.886745823033
Weights (['gre', 'gpa', 'rank']):[ 0.28862268  0.19402388 -0.36637964]
Area under ROC: 0.724056603774

The difference could be from the feature preprocessing as mentioned.  I 
normalized the features around 0:

binary_train_normalized = (binary_train - binary_train.mean()) / 
binary_train.std()
binary_test_normalized = (binary_test - binary_train.mean()) / 
binary_train.std()

On a data set this small, the difference in models could also be the result of 
how the training/test sets were split.

Have you tried running k-folds cross validation on a larger data set?

Chris

> On May 20, 2015, at 6:15 PM, DB Tsai  wrote:
> 
> Hi Xin,
> 
> If you take a look at the model you trained, the intercept from Spark
> is significantly smaller than StatsModel, and the intercept represents
> a prior on categories in LOR which causes the low accuracy in Spark
> implementation. In LogisticRegressionWithLBFGS, the intercept is
> regularized due to the implementation of Updater, and the intercept
> should not be regularized.
> 
> In the new pipleline APIs, a LOR with elasticNet is implemented, and
> the intercept is properly handled.
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
> 
> As you can see the tests,
> https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
> the result is exactly the same as R now.
> 
> BTW, in both version, the feature scalings are done before training,
> and we train the model in scaled space but transform the model weights
> back to original space. The only difference is in the mllib version,
> LogisticRegressionWithLBFGS regularizes the intercept while in the ml
> version, the intercept is excluded from regularization. As a result,
> if lambda is zero, the model should be the same.
> 
> 
> 
> On Wed, May 20, 2015 at 3:42 PM, Xin Liu  wrote:
>> Hi,
>> 
>> I have tried a few models in Mllib to train a LogisticRegression model.
>> However, I consistently get much better results using other libraries such
>> as statsmodel (which gives similar results as R) in terms of AUC. For
>> illustration purpose, I used a small data (I have tried much bigger data)
>> http://www.ats.ucla.edu/stat/data/binary.csv in
>> http://www.ats.ucla.edu/stat/r/dae/logit.htm
>> 
>> Here is the snippet of my usage of LogisticRegressionWithLBFGS.
>> 
>> val algorithm = new LogisticRegressionWithLBFGS
>> algorithm.setIntercept(true)
>> algorithm.optimizer
>>   .setNumIterations(100)
>>   .setRegParam(0.01)
>>   .setConvergenceTol(1e-5)
>> val model = algorithm.run(training)
>> model.clearThreshold()
>> val scoreAndLabels = test.map { point =>
>>   val score = model.predict(point.features)
>>   (score, point.label)
>> }
>> val metrics = new BinaryClassificationMetrics(scoreAndLabels)
>> val auROC = metrics.areaUnderROC()
>> 
>> I did a (0.6, 0.4) split for training/test. The response is "admit" and
>> features are "GRE score", "GPA", and "college Rank".
>> 
>> Spark:
>> Weights (GRE, GPA, Rank):
>> [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
>> Intercept: -0.6488972641282202
>> Area under ROC: 0.6294070512820512
>> 
>> StatsModel:
>> Weights [0.0018, 0.7220, -0.3148]
>> Intercept: -3.5913
>> Area under ROC: 0.69
>> 
>> The weights from statsmodel seems more reasonable if you consider for a one
>> unit increase in gpa, the log odds of being admitted to graduate school
>> increases by 0.72 in statsmodel than 0.04 in Spark.
>> 
>> I have seen much bigger difference with other data. So my question is has
>> anyone compared the results with other libraries and is anything wrong with
>> my code to invoke LogisticRegressionWithLBFGS?
>> 
>> As the real data I am processing is pretty big and really want to use Spark
>> to get this to work. Please let me know if you have similar experience and
>> how you resolve it.
>> 
>> Thanks,
>> Xin
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-20 Thread Dibyendu Bhattacharya
Thanks Tathagata for making this change..

Dibyendu

On Thu, May 21, 2015 at 8:24 AM, Tathagata Das  wrote:

> If you are talking about handling driver crash failures, then all bets are
> off anyways! Adding a shutdown hook in the hope of handling driver process
> failure, handles only a some cases (Ctrl-C), but does not handle cases like
> SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its
> not a good idea to rely on that.
>
> Nonetheless I have opened a PR to handle the shutdown of the
> StreamigntContext in the same way as SparkContext.
> https://github.com/apache/spark/pull/6307
>
>
> On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Thenka Sean . you are right. If driver program is running then I can
>> handle shutdown in main exit path  . But if Driver machine is crashed (if
>> you just stop the application, for example killing the driver process ),
>> then Shutdownhook is the only option isn't it ? What I try to say is , just
>> doing ssc.stop in  sys.ShutdownHookThread  or
>>  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
>> to use the Utils.addShutdownHook with a priority .. So just checking if
>> Spark Streaming can make graceful shutdown as default shutdown mechanism.
>>
>> Dibyendu
>>
>> On Tue, May 19, 2015 at 1:03 PM, Sean Owen  wrote:
>>
>>> I don't think you should rely on a shutdown hook. Ideally you try to
>>> stop it in the main exit path of your program, even in case of an
>>> exception.
>>>
>>> On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
>>>  wrote:
>>> > You mean to say within Runtime.getRuntime().addShutdownHook I call
>>> > ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
>>> >
>>> > This won't work anymore in 1.4.
>>> >
>>> > The SparkContext got stopped before Receiver processed all received
>>> blocks
>>> > and I see below exception in logs. But if I add the
>>> Utils.addShutdownHook
>>> > with the priority as I mentioned , then only graceful shutdown works .
>>> In
>>> > that case shutdown-hook run in priority order.
>>> >
>>>
>>
>>
>


Storing spark processed output to Database asynchronously.

2015-05-20 Thread Gautam Bajaj
Hi,

>From my understanding of Spark Streaming, I created a spark entry point,
for continuous UDP data, using:

SparkConf conf = new
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
jssc = new JavaStreamingContext(conf, new
Duration(1));JavaReceiverInputDStream lines =
jssc.receiverStream(new CustomReceiver(8060));

Now, when I process this input stream using:

JavaDStream hash=lines.flatMap()JavaPairDStream tuple=
hash.mapToPair()JavaPairDStream output=
tuple.reduceByKey()
output.foreachRDD(
new
Function2>,Time,Void>(){
@Override
public Void call(
JavaPairRDD> arg0,
Time arg1) throws Exception {
// TODO Auto-generated method stub
new AsyncRDDActions(arg0.rdd(), null);
arg0.foreachPartition(
new
VoidFunction>>>(){

@Override
public void call(
Iterator>> arg0)
throws Exception {

// TODO Auto-generated method stub
GraphDatabaseService graphDb =
new 
GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")

.setConfig("remote_shell_enabled", "true")
.newGraphDatabase();

try (Transaction tx =
graphDb.beginTx()) {
while (arg0.hasNext()) {
Tuple2 < String,
ArrayList < String >> tuple = arg0.next();
Node
HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
boolean oldHMac=false;
if (HMac!= null){

System.out.println("Alread in Database:" + tuple._1);
oldHMac=true;
}
else

HMac=Neo4jOperations.createHMac(graphDb, tuple._1);

ArrayList
zipcodes=tuple._2;
for(String zipcode : zipcodes){
Node
Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
if(Zipcode!=null){

System.out.println("Already in Database:" + zipcode);

if(oldHMac==true && Neo4jOperations.getRelationshipBetween(HMac,
Zipcode)!=null)

Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
else

Neo4jOperations.travelTo(HMac, Zipcode);
}
else{

Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);

Neo4jOperations.travelTo(HMac, Zipcode);
}
}
}
tx.success();
}
graphDb.shutdown();
}
});
return null;
}
});

The part of code in output.foreachRDD pushes the output of spark into Neo4j
Database. Checking for duplicates values.

This part of code is very time consuming because of which my processing
time exceeds batch time. Because of that, it *result in dataloss*. So, I
was thinking of pushing the output into the database asynchronously.
I found AsyncRDDActions(
https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html)
for this purpose, but cannot find a working example for that in Java.
Especially, the function foreachPatitionAsync inside which we have to use
"Function1"

Any help is appreciated.

Thanks,
Gautam


View all user's application logs in history server

2015-05-20 Thread Jianshi Huang
Hi,

I'm using Spark 1.4.0-rc1 and I'm using default settings for history server.

But I can only see my own logs. Is it possible to view all user's logs? The
permission is fine for the user group.

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Unable to use hive queries with constants in predicates

2015-05-20 Thread Devarajan Srinivasan
Hi,

   I was testing spark to read data from hive using HiveContext. I got the
following error, when I used a simple query with constants in predicates.

  I am using spark 1.3*. *Anyone encountered error like this ??


*Error:*


Exception in thread "main" org.apache.spark.sql.AnalysisException:
Unsupported language features in query: SELECT * from test_table where
daily_partition='20150101'
TOK_QUERY 1, 0,20, 81
  TOK_FROM 1, 10,14, 81
TOK_TABREF 1, 12,14, 81
  TOK_TABNAME 1, 12,14, 81
everest_marts_test 1, 12,12, 81
voice_cdr 1, 14,14, 100
  TOK_INSERT 0, -1,-1, 0
TOK_DESTINATION 0, -1,-1, 0
  TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 1, 0,8, 7
  TOK_SELEXPR 1, 2,2, 7
TOK_TABLE_OR_COL 1, 2,2, 7
  callingpartynumber 1, 2,2, 7
  TOK_SELEXPR 1, 4,4, 26
TOK_TABLE_OR_COL 1, 4,4, 26
  calledpartynumber 1, 4,4, 26
  TOK_SELEXPR 1, 6,6, 44
TOK_TABLE_OR_COL 1, 6,6, 44
  chargingtime 1, 6,6, 44
  TOK_SELEXPR 1, 8,8, 57
TOK_TABLE_OR_COL 1, 8,8, 57
  call_direction_key 1, 8,8, 57
TOK_WHERE 1, 16,20, 131
  = 1, 18,20, 131
TOK_TABLE_OR_COL 1, 18,18, 116
  daily_partition 1, 18,18, 116
'20150101' 1, 20,20, 132

scala.NotImplementedError: No parse rules for ASTNode type: 294, text:
'20150101' :
'20150101' 1, 20,20, 132
" +
org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
  ;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$
hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$
hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(
Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.
scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(
PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(
AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$
SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$
SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.
apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(
Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$
append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.
scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$
apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinat

Re: FP Growth saveAsTextFile

2015-05-20 Thread Xiangrui Meng
+user

If this was in cluster mode, you should provide a path on a shared file
system, e.g., HDFS, instead of a local path. If this is in local model, I'm
not sure what went wrong.

On Wed, May 20, 2015 at 2:09 PM, Eric Tanner 
wrote:

> Here is the stack trace. Thanks for looking at this.
>
> scala>
> model.freqItemsets.saveAsTextFile("c:///repository/trunk/Scala_210_wspace/fpGrowth/modelText1")
> 15/05/20 14:07:47 INFO SparkContext: Starting job: saveAsTextFile at
> :33
> 15/05/20 14:07:47 INFO DAGScheduler: Got job 15 (saveAsTextFile at
> :33) with 2 output partitions (allowLocal=false)
> 15/05/20 14:07:47 INFO DAGScheduler: Final stage: Stage 30(saveAsTextFile
> at :33)
> 15/05/20 14:07:47 INFO DAGScheduler: Parents of final stage: List(Stage 29)
> 15/05/20 14:07:47 INFO DAGScheduler: Missing parents: List()
> 15/05/20 14:07:47 INFO DAGScheduler: Submitting Stage 30
> (MapPartitionsRDD[21] at saveAsTextFile at :33), which has no
> missing parents
> 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(131288) called with
> curMem=724428, maxMem=278302556
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18 stored as values in
> memory (estimated size 128.2 KB, free 264.6 MB)
> 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(78995) called with
> curMem=855716, maxMem=278302556
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18_piece0 stored as
> bytes in memory (estimated size 77.1 KB, free 264.5 MB)
> 15/05/20 14:07:47 INFO BlockManagerInfo: Added broadcast_18_piece0 in
> memory on localhost:52396 (size: 77.1 KB, free: 265.1 MB)
> 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
> broadcast_18_piece0
> 15/05/20 14:07:47 INFO SparkContext: Created broadcast 18 from broadcast
> at DAGScheduler.scala:839
> 15/05/20 14:07:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 30 (MapPartitionsRDD[21] at saveAsTextFile at :33)
> 15/05/20 14:07:47 INFO TaskSchedulerImpl: Adding task set 30.0 with 2 tasks
> 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 17
> 15/05/20 14:07:47 INFO TaskSetManager: Starting task 0.0 in stage 30.0
> (TID 33, localhost, PROCESS_LOCAL, 1056 bytes)
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17_piece0
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17_piece0 of size 4737
> dropped from memory (free 277372582)
> 15/05/20 14:07:47 INFO TaskSetManager: Starting task 1.0 in stage 30.0
> (TID 34, localhost, PROCESS_LOCAL, 1056 bytes)
> 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_17_piece0 on
> localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
> 15/05/20 14:07:47 INFO Executor: Running task 1.0 in stage 30.0 (TID 34)
> 15/05/20 14:07:47 INFO Executor: Running task 0.0 in stage 30.0 (TID 33)
> 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
> broadcast_17_piece0
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17 of size 6696
> dropped from memory (free 277379278)
> 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 17
> 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 16
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16_piece0
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16_piece0 of size 4737
> dropped from memory (free 277384015)
> 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_16_piece0 on
> localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
> 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
> broadcast_16_piece0
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16 of size 6696
> dropped from memory (free 277390711)
> 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 16
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
> blocks out of 2 blocks
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 1 ms
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
> blocks out of 2 blocks
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 15/05/20 14:07:47 ERROR Executor: Exception in task 1.0 in stage 30.0 (TID
> 34)
> java.lang.NullPointerException
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656)
> at
> org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462)
>  

Re: How to process data in chronological order

2015-05-20 Thread Sonal Goyal
Would partitioning your data based on the key and then running
mapPartitions help?

Best Regards,
Sonal
Founder, Nube Technologies 





On Thu, May 21, 2015 at 4:33 AM, roy  wrote:

> I have a key-value RDD, key is a timestamp (femto-second resolution, so
> grouping buys me nothing) and I want to reduce it in the chronological
> order.
>
> How do I do that in spark?
>
> I am fine with reducing contiguous sections of the set separately and then
> aggregating the resulting objects locally.
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-data-in-chronological-order-tp22966.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: rdd.saveAsTextFile problem

2015-05-20 Thread Keerthi
Hi ,

I had tried the workaround shared here, but still facing the same issue...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: rdd.saveAsTextFile problem

2015-05-20 Thread Akhil Das
This thread happened a year back, can you please share what issue you are
facing? which version of spark you are using? What is your system
environment? Exception stack-trace?

Thanks
Best Regards

On Thu, May 21, 2015 at 12:19 PM, Keerthi 
wrote:

> Hi ,
>
> I had tried the workaround shared here, but still facing the same issue...
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p22970.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>