R: Tungsten and Spark Streaming

2015-09-10 Thread Paolo Platter
Did you plan to modify dstream interface in order to work with dataframe ? It 
would be nice handle dstreams without generics

Paolo

Inviata dal mio Windows Phone

Da: Tathagata Das
Inviato: ‎10/‎09/‎2015 07:42
A: N B
Cc: user
Oggetto: Re: Tungsten and Spark Streaming

Rewriting is necessary. You will have to convert RDD/DStream operations to 
DataFrame operations. So get the RDDs in DStream, using transform/foreachRDD, 
convert to DataFrames and then do DataFrame operations.

On Wed, Sep 9, 2015 at 9:23 PM, N B 
> wrote:
Hello,

How can we start taking advantage of the performance gains made under Project 
Tungsten in Spark 1.5 for a Spark Streaming program?

>From what I understand, this is available by default for Dataframes. But for a 
>program written using Spark Streaming, would we see any potential gains "out 
>of the box" in 1.5 or will we have to rewrite some portions of the application 
>code to realize that benefit?

Any insight/documentation links etc in this regard will be appreciated.

Thanks
Nikunj




Re: Avoiding SQL Injection in Spark SQL

2015-09-10 Thread Sean Owen
I don't think this is Spark-specific. Mostly you need to escape /
quote user-supplied values as with any SQL engine.

On Thu, Sep 10, 2015 at 7:32 AM, V Dineshkumar
 wrote:
> Hi,
>
> What is the preferred way of avoiding SQL Injection while using Spark SQL?
> In our use case we have to take the parameters directly from the users and
> prepare the SQL Statement.I was not able to find any API for preparing the
> SQL statement safely avoiding injection.
>
> Thanks,
> Dinesh
> Philips India

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Akhil Das
This consumer pretty much covers all those scenarios you listed
github.com/dibbhatt/kafka-spark-consumer Give it a try.

Thanks
Best Regards

On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
wrote:

> Hi there,
> I have a problem with fulfilling all my needs when using Spark Streaming
> on Kafka. Let me enumerate my requirements:
> 1. I want to have at-least-once/exactly-once processing.
> 2. I want to have my application fault & simple stop tolerant. The Kafka
> offsets need to be tracked between restarts.
> 3. I want to be able to upgrade code of my application without losing
> Kafka offsets.
>
> Now what my requirements imply according to my knowledge:
> 1. implies using new Kafka DirectStream.
> 2. implies  using checkpointing. kafka DirectStream will write offsets to
> the checkpoint as well.
> 3. implies that checkpoints can't be used between controlled restarts. So
> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
> a description how:
> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
> )
>
> Now my problems are:
> 1. If I cannot make checkpoints between code upgrades, does it mean that
> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
> that I have to implement my own storing to/initalization of offsets from
> Zookeeper?
> 2. When I set up shutdownHook and my any executor throws an exception, it
> seems that application does not fail, but stuck in running state. Is that
> because stopGracefully deadlocks on exceptions? How to overcome this
> problem? Maybe I can avoid setting shutdownHook and there is other way to
> stop gracefully your app?
>
> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
> to be able to upgrade code & not lose Kafka offsets?
>
>
> Thank you a lot for your answers,
> Krzysztof Zarzycki
>
>
>
>


Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-10 Thread Ewan Leith
The last time I checked, if you launch EMR 4 with only Spark selected as an 
application, HDFS isn't correctly installed.


Did you select another application like Hive at launch time as well as Spark? 
If not, try that.


Thanks,

Ewan


-- Original message--

From: Dean Wampler

Date: Wed, 9 Sep 2015 22:29

To: shahab;

Cc: user@spark.apache.org;

Subject:Re: [Spark on Amazon EMR] : File does not exist: 
hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar


If you log into the cluster, do you see the file if you type:

hdfs dfs -ls 
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

(with the correct server address for "ipx-x-x-x"). If not, is the server 
address correct and routable inside the cluster. Recall that EC2 instances have 
both public and private host names & IP addresses.

Also, is the port number correct for HDFS in the cluster?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd 
Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Wed, Sep 9, 2015 at 9:28 AM, shahab 
> wrote:
Hi,
I am using Spark on Amazon EMR. So far I have not succeeded to submit the 
application successfully, not sure what's problem. In the log file I see the 
followings.
java.io.FileNotFoundException: File does not exist: 
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the fat 
jar file didn't solve the problem. I am out of clue now.
I want to submit a spark application, using aws web console, as a step. I 
submit the application as : spark-submit --deploy-mode cluster --class 
mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is there 
any one who has similar problem with EMR?

best,
/Shahab



Cassandra row count grouped by multiple columns

2015-09-10 Thread Chirag Dewan
Hi,

I am using Spark 1.2.0 with Cassandra 2.0.14. I have a problem where I need a 
count of rows unique to multiple columns.

So I have a column family with 3 columns i.e. a,b,c and for each value of 
distinct a1,b1,c1 I want the row count.

For eg:
A1,B1,C1
A2,B2,C2
A3,B3,C2
A1,B1,C1

The output should be:
A1,B1,C1,2
A2,B2,C2,1
A3,B3,C3,1

What is the optimum way of achieving this?

Thanks in advance.

Chirag


Spark Streaming stop gracefully doesn't return to command line after upgrade to 1.4.0 and beyond

2015-09-10 Thread Petr Novak
Hello,
my Spark streaming v1.3.0 code uses

sys.ShutdownHookThread {
  ssc.stop(stopSparkContext = true, stopGracefully = true)
}

to use Ctrl+C in command line to stop it. It returned back to command line
after it finished batch but it doesn't with v1.4.0-v.1.5.0. Was the
behaviour or required code changed?

The last messages are:

[2015-09-08 13:02:43,300] INFO Waited for jobs to be processed and
checkpoints to be written
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-08 13:02:43,300] INFO CheckpointWriter executor terminated ? true,
waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
[2015-09-08 13:02:43,301] INFO Stopped JobGenerator
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-08 13:02:43,302] INFO Stopped JobScheduler
(org.apache.spark.streaming.scheduler.JobScheduler)
[2015-09-08 13:02:43,303] INFO stopped
o.s.j.s.ServletContextHandler{/streaming,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-08 13:02:43,305] INFO stopped
o.s.j.s.ServletContextHandler{/streaming/batch,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-08 13:02:43,307] INFO stopped
o.s.j.s.ServletContextHandler{/static/streaming,null}
(org.spark-project.jetty.server.handler.ContextHandler)

Thank you for any explanation,
Petr


Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-10 Thread shahab
Thank you all for the comments, but my problem still exists.
@Dean,@Ewan yes, I do have hadoop file system installed and working

@Sujit: the last version of EMR (version 4)  does not need manual copying
of jar file to the server. The blog that you pointed out refers to older
version (3.x) of EMR. But I will try your solution as well.
@Neil : I think something is wrong with my fat jar file, I think I am
missing some dependencies in my jar file !

Again thank you all

/Shahab

On Wed, Sep 9, 2015 at 11:28 PM, Dean Wampler  wrote:

> If you log into the cluster, do you see the file if you type:
>
> hdfs dfs
> -ls 
> hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
>
> (with the correct server address for "ipx-x-x-x"). If not, is the server
> address correct and routable inside the cluster. Recall that EC2 instances
> have both public and private host names & IP addresses.
>
> Also, is the port number correct for HDFS in the cluster?
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Wed, Sep 9, 2015 at 9:28 AM, shahab  wrote:
>
>> Hi,
>> I am using Spark on Amazon EMR. So far I have not succeeded to submit
>> the application successfully, not sure what's problem. In the log file I
>> see the followings.
>> java.io.FileNotFoundException: File does not exist:
>> hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
>>
>> However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the
>> fat jar file didn't solve the problem. I am out of clue now.
>> I want to submit a spark application, using aws web console, as a step. I
>> submit the application as : spark-submit --deploy-mode cluster --class
>> mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is
>> there any one who has similar problem with EMR?
>>
>> best,
>> /Shahab
>>
>
>


Re: spark.kryo.registrationRequired: Tuple2 is not registered

2015-09-10 Thread Marius Soutier
Found an issue for this:
https://issues.apache.org/jira/browse/SPARK-10251 

> On 09.09.2015, at 18:00, Marius Soutier  wrote:
> 
> Hi all,
> 
> as indicated in the title, I’m using Kryo with a custom Kryo serializer, but 
> as soon as I enable `spark.kryo.registrationRequired`, my Spark Streaming job 
> fails to start with this exception:
> 
> Class is not registered: scala.collection.immutable.Range
> 
> When I register it, it continues with Tuple2, which I cannot serialize sanely 
> for all specialized forms. According to the documentation, this should be 
> handled by Chill. Is this a bug or what am I missing?
> 
> I’m using Spark 1.4.1.
> 
> 
> Cheers
> - Marius
> 



spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Todd
Hi,

I am using data generated with 
sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark 
sql performance (spark on yarn, with 10 nodes) with the following code (The 
table store_sales is about 90 million records, 6G in size)
 
val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
val name="store_sales"
sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE ${name}
  |USING org.apache.spark.sql.parquet
  |OPTIONS (
  |  path '${outputDir}'
  |)
""".stripMargin)

val sql="""
 |select
 |  t1.ss_quantity,
 |  t1.ss_list_price,
 |  t1.ss_coupon_amt,
 |  t1.ss_cdemo_sk,
 |  t1.ss_item_sk,
 |  t1.ss_promo_sk,
 |  t1.ss_sold_date_sk
 |from store_sales t1 join store_sales t2 on t1.ss_item_sk = 
t2.ss_item_sk
 |where
 |  t1.ss_sold_date_sk between 2450815 and 2451179
   """.stripMargin

val df = sqlContext.sql(sql)
df.rdd.foreach(row=>Unit)

With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes with 
1.5.

The configuration are basically the same, since I copy the configuration from 
1.4.1 to 1.5:

sparkVersion1.4.11.5.0
scaleFactor3030
spark.sql.shuffle.partitions600600
spark.sql.sources.partitionDiscovery.enabledtruetrue
spark.default.parallelism200200
spark.driver.memory4G4G4G
spark.executor.memory4G4G
spark.executor.instances1010
spark.shuffle.consolidateFilestruetrue
spark.storage.memoryFraction0.40.4
spark.executor.cores33

I am not sure where is going wrong,any ideas?




Custom UDAF Evaluated Over Window

2015-09-10 Thread xander92
While testing out the new UserDefinedAggregateFunction in Spark 1.5.0, I
successfully implemented a simple function to compute an average. I then
tried to test this function by applying it over a simple window and I got an
error saying that my function is not supported over window operation.

So, is applying custom UDAFs over windows possible in Spark 1.5.0 and I
simply have a mistake somewhere?. If it is not possible, are there patches
that make this sort of thing possible that are simply not included in the
new release or is this functionality something that will hopefully come soon
in a later release?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDAF-Evaluated-Over-Window-tp24637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark-shell throws Hive error when SQLContext.parquetFile, v1.3

2015-09-10 Thread Petr Novak
Hello,

sqlContext.parquetFile(dir)

throws exception " Unable to instantiate
org.apache.hadoop.hive.metastore.HiveMetaStoreClient"

The strange thing is that on the second attempt to open the file it is
successful:

try {
sqlContext.parquetFile(dir)
  } catch {
case e: Exception => sqlContext.parquetFile(dir)
}

What should I do to make my script to run flawlessly in spark-shell when
opening parquetFiles. It is probably missing some dependency. Or how should
I write the code because this double attempt is awfull and I don't need
HiveMetaStoreClient, I just need to open parquet file.

Many thanks for any idea,
Petr


Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Hi there,
I have a problem with fulfilling all my needs when using Spark Streaming on
Kafka. Let me enumerate my requirements:
1. I want to have at-least-once/exactly-once processing.
2. I want to have my application fault & simple stop tolerant. The Kafka
offsets need to be tracked between restarts.
3. I want to be able to upgrade code of my application without losing Kafka
offsets.

Now what my requirements imply according to my knowledge:
1. implies using new Kafka DirectStream.
2. implies  using checkpointing. kafka DirectStream will write offsets to
the checkpoint as well.
3. implies that checkpoints can't be used between controlled restarts. So I
need to install shutdownHook with ssc.stop(stopGracefully=true) (here is a
description how:
https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
)

Now my problems are:
1. If I cannot make checkpoints between code upgrades, does it mean that
Spark does not help me at all with keeping my Kafka offsets? Does it mean,
that I have to implement my own storing to/initalization of offsets from
Zookeeper?
2. When I set up shutdownHook and my any executor throws an exception, it
seems that application does not fail, but stuck in running state. Is that
because stopGracefully deadlocks on exceptions? How to overcome this
problem? Maybe I can avoid setting shutdownHook and there is other way to
stop gracefully your app?

3. If I somehow overcome 2., is it enough to just stop gracefully my app to
be able to upgrade code & not lose Kafka offsets?


Thank you a lot for your answers,
Krzysztof Zarzycki


Re: Perf impact of BlockManager byte[] copies

2015-09-10 Thread Reynold Xin
This is one problem I'd like to address soon - providing a binary block
management interface for shuffle (and maybe other things) that avoids
serialization/copying.


On Fri, Feb 27, 2015 at 3:39 PM, Paul Wais  wrote:

> Dear List,
>
> I'm investigating some problems related to native code integration
> with Spark, and while picking through BlockManager I noticed that data
> (de)serialization currently issues lots of array copies.
> Specifically:
>
> - Deserialization: BlockManager marshals all deserialized bytes
> through a spark.util. ByteBufferInputStream, which necessitates
> copying data into an intermediate temporary byte[] .  The temporary
> byte[] might be reused between deserialization of T instances, but
> nevertheless the bytes must be copied (and likely in a Java loop).
>
> - Serialization: BlockManager buffers all serialized bytes into a
> java.io.ByteArrayOutputStream, which maintains an internal byte[]
> buffer and grows/re-copies the buffer like a vector as the buffer
> fills.  BlockManager then retrieves the internal byte[] buffer, wraps
> it in a ByteBuffer, and sends it off to be stored (e.g. in
> MemoryStore, DiskStore, Tachyon, etc).
>
> When an individual T is somewhat large (e.g. a feature vector, an
> image, etc), or blocks are megabytes in size, these copies become
> expensive (especially for large written blocks), right?  Does anybody
> have any measurements of /how/ expensive they are?  If not, is there
> serialization benchmark code (e.g. for KryoSerializer ) that might be
> helpful here?
>
>
> As part of my investigation, I've found that one might be able to
> sidestep these issues by extending Spark's SerializerInstance API to
> offer I/O on ByteBuffers (in addition to {Input,Output}Streams).  An
> extension including a ByteBuffer API would furthermore have many
> benefits for native code.  A major downside of this API addition is
> that it wouldn't interoperate (nontrivially) with compression, so
> shuffles wouldn't benefit.  Nevertheless, BlockManager could probably
> deduce when use of this ByteBuffer API is possible and leverage it.
>
> Cheers,
> -Paul
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Can not allocate executor when running spark on mesos

2015-09-10 Thread Iulian Dragoș
On Thu, Sep 10, 2015 at 3:35 AM, canan chen  wrote:

> Finally got the answer.  Actually it works fine. The allocation behavior
> on mesos is a little different from yarn/standalone. Seems the executor in
> mesos is lazily allocated (only when job is executed) while executor in
> yarn/standalone is allocated when spark-shell is started.
>

That's in fine-grained mode (the default). You can turn on coarse-grained
mode to acquire executors on startup.

iulian


>
>
>
> On Tue, Sep 8, 2015 at 10:39 PM, canan chen  wrote:
>
>> Yes, I follow the guide in this doc, and run it as mesos client mode
>>
>> On Tue, Sep 8, 2015 at 6:31 PM, Akhil Das 
>> wrote:
>>
>>> In which mode are you submitting your application? (coarse-grained or
>>> fine-grained(default)). Have you gone through this documentation already?
>>> http://spark.apache.org/docs/latest/running-on-mesos.html#using-a-mesos-master-url
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Sep 8, 2015 at 12:54 PM, canan chen  wrote:
>>>
 Hi all,

 I try to run spark on mesos, but it looks like I can not allocate
 resources from mesos. I am not expert of mesos, but from the mesos log, it
 seems spark always decline the offer from mesos. Not sure what's wrong,
 maybe need some configuration change. Here's the mesos master log

 I0908 15:08:16.515960 301916160 master.cpp:1767] Received registration
 request for framework 'Spark shell' at
 scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
 I0908 15:08:16.520545 301916160 master.cpp:1834] Registering framework
 20150908-143320-16777343-5050-41965- (Spark shell) at
 scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133 with
 checkpointing disabled and capabilities [  ]
 I0908 15:08:16.522307 300843008 hierarchical.hpp:386] Added framework
 20150908-143320-16777343-5050-41965-
 I0908 15:08:16.525845 301379584 master.cpp:4290] Sending 1 offers to
 framework 20150908-143320-16777343-5050-41965- (Spark shell) at
 scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
 I0908 15:08:16.637677 302452736 master.cpp:2884] Processing DECLINE
 call for offers: [ 20150908-143320-16777343-5050-41965-O0 ] for framework
 20150908-143320-16777343-5050-41965- (Spark shell) at
 scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
 I0908 15:08:16.639197 299233280 hierarchical.hpp:761] Recovered
 cpus(*):8; mem(*):15360; disk(*):470842; ports(*):[31000-32000] (total:
 cpus(*):8; mem(*):15360; disk(*):470842; ports(*):[31000-32000], allocated:
 ) on slave 20150908-143320-16777343-5050-41965-S0 from framework
 20150908-143320-16777343-5050-41965-
 I0908 15:08:21.786932 300306432 master.cpp:4290] Sending 1 offers to
 framework 20150908-143320-16777343-5050-41965- (Spark shell) at
 scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
 I0908 15:08:21.789979 298696704 master.cpp:2884] Processing DECLINE
 call for offers: [ 20150908-143320-16777343-5050-41965-O1 ] for framework
 20150908-143320-16777343-5050-41965- (Spark shell) at
 scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133

>>>
>>>
>>
>


-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Terasort on spark -- Jmeter

2015-09-10 Thread Shreeharsha G Neelakantachar
Hi,
  I am trying to simulate multiple users/threads for Terasort on 
spark1.4.1 as part of  studying it's performance patterns. Please let me 
know of any Jmeter plugins available for same or should i use which 
protocol to record the Terasort execution flow ? 

Any steps on using Jmeter to simulate Terasort on spark would be of great 
help.. 

Kindly help with the same..

Regards
Harsha

Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Cody Koeninger
Post the actual stacktrace you're getting

On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora 
wrote:

> Executors in spark streaming 1.3 fetch messages from kafka in batches and
> what happens when executor takes longer time to complete a fetch batch
>
> say in
>
>
> directKafkaStream.foreachRDD(new Function, Void>() {
>
> @Override
> public Void call(JavaRDD v1) throws Exception {
> v1.foreachPartition(new  VoidFunction>{
> @Override
> public void call(Iterator t) throws Exception {
> //long running task
> }});}});
>
> Will this long running task drops the connectio of executor with kafka
> brokers-
> And how to handle that. I am getting Connection tmeout in my code.
>
>
>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dmitry Goldenberg
>> checkpoints can't be used between controlled restarts

Is that true? If so, why? From my testing, checkpoints appear to be working
fine, we get the data we've missed between the time the consumer went down
and the time we brought it back up.

>> If I cannot make checkpoints between code upgrades, does it mean that
Spark does not help me at all with keeping my Kafka offsets? Does it mean,
that I have to implement my own storing to/initalization of offsets from
Zookeeper?

By code upgrades, are code changes to the consumer program meant?

If that is the case, one idea we've been entertaining is that, if the
consumer changes, especially if its configuration parameters change, it
means that some older configuration may still be stuck in the
checkpointing.  What we'd do in this case is, prior to starting the
consumer, blow away the checkpointing directory and re-consume from Kafka
from the smallest offsets.  In our case, it's OK to re-process; I realize
that in many cases that may not be an option.  If that's the case then it
would seem to follow that you have to manage offsets in Zk...

Another thing to consider would be to treat upgrades operationally. In
that, if an upgrade is to happen, consume the data up to a certain point
then bring the system down for an upgrade. Remove checkpointing. Restart
everything; the system would now be rebuilding the checkpointing and using
your upgraded consumers.  (Again, this may not be possible in some systems
where the data influx is constant and/or the data is mission critical)...

Perhaps this discussion implies that there may be a new feature in Spark
where it intelligently drops the checkpointing or allows you to selectively
pluck out and drop some items prior to restarting...




On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das 
wrote:

> This consumer pretty much covers all those scenarios you listed
> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>
> Thanks
> Best Regards
>
> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
> wrote:
>
>> Hi there,
>> I have a problem with fulfilling all my needs when using Spark Streaming
>> on Kafka. Let me enumerate my requirements:
>> 1. I want to have at-least-once/exactly-once processing.
>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>> offsets need to be tracked between restarts.
>> 3. I want to be able to upgrade code of my application without losing
>> Kafka offsets.
>>
>> Now what my requirements imply according to my knowledge:
>> 1. implies using new Kafka DirectStream.
>> 2. implies  using checkpointing. kafka DirectStream will write offsets to
>> the checkpoint as well.
>> 3. implies that checkpoints can't be used between controlled restarts. So
>> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
>> a description how:
>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>> )
>>
>> Now my problems are:
>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>> 2. When I set up shutdownHook and my any executor throws an exception, it
>> seems that application does not fail, but stuck in running state. Is that
>> because stopGracefully deadlocks on exceptions? How to overcome this
>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>> stop gracefully your app?
>>
>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>> to be able to upgrade code & not lose Kafka offsets?
>>
>>
>> Thank you a lot for your answers,
>> Krzysztof Zarzycki
>>
>>
>>
>>
>


Re: Spark-shell throws Hive error when SQLContext.parquetFile, v1.3

2015-09-10 Thread Cheng Lian
If you don't need to interact with Hive, you may compile Spark without 
using the -Phive flag to eliminate Hive dependencies. In this way, the 
sqlContext instance in Spark shell will be of type SQLContext instead of 
HiveContext.


The reason behind the Hive metastore error is probably due to Hive 
misconfiguration.


Cheng

On 9/10/15 6:02 PM, Petr Novak wrote:

Hello,

sqlContext.parquetFile(dir)

throws exception " Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient"


The strange thing is that on the second attempt to open the file it is 
successful:


try {
sqlContext.parquetFile(dir)
  } catch {
case e: Exception => sqlContext.parquetFile(dir)
}

What should I do to make my script to run flawlessly in spark-shell 
when opening parquetFiles. It is probably missing some dependency. Or 
how should I write the code because this double attempt is awfull and 
I don't need HiveMetaStoreClient, I just need to open parquet file.


Many thanks for any idea,
Petr





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best way to import data from Oracle to Spark?

2015-09-10 Thread Sathish Kumaran Vairavelu
I guess data pump export from Oracle could be fast option. Hive now has
oracle data pump serde..

https://docs.oracle.com/cd/E57371_01/doc.41/e57351/copy2bda.htm


On Wed, Sep 9, 2015 at 4:41 AM Reynold Xin  wrote:

> Using the JDBC data source is probably the best way.
> http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#jdbc-to-other-databases
>
> On Tue, Sep 8, 2015 at 10:11 AM, Cui Lin  wrote:
>
>> What's the best way to import data from Oracle to Spark? Thanks!
>>
>>
>> --
>> Best regards!
>>
>> Lin,Cui
>>
>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Thanks Akhil, seems like an interesting option to consider.
Do you know if the package is production-ready? Do you use it in production?

And do you know if it works for Spark 1.3.1 as well? README mentions that
package in spark-packages.org is built with Spark 1.4.1.


Anyway, it  seems that core Spark Streaming does not support my case? Or
anyone can instruct me on how to do it? Let's say, that I'm even fine (but
not content about) with using KafkaCluster private class that is included
in Spark, for manual managing ZK offsets. Has someone done it before? Has
someone public code examples of manually managing ZK offsets?

Thanks,
Krzysztof

2015-09-10 12:22 GMT+02:00 Akhil Das :

> This consumer pretty much covers all those scenarios you listed
> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>
> Thanks
> Best Regards
>
> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
> wrote:
>
>> Hi there,
>> I have a problem with fulfilling all my needs when using Spark Streaming
>> on Kafka. Let me enumerate my requirements:
>> 1. I want to have at-least-once/exactly-once processing.
>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>> offsets need to be tracked between restarts.
>> 3. I want to be able to upgrade code of my application without losing
>> Kafka offsets.
>>
>> Now what my requirements imply according to my knowledge:
>> 1. implies using new Kafka DirectStream.
>> 2. implies  using checkpointing. kafka DirectStream will write offsets to
>> the checkpoint as well.
>> 3. implies that checkpoints can't be used between controlled restarts. So
>> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
>> a description how:
>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>> )
>>
>> Now my problems are:
>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>> 2. When I set up shutdownHook and my any executor throws an exception, it
>> seems that application does not fail, but stuck in running state. Is that
>> because stopGracefully deadlocks on exceptions? How to overcome this
>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>> stop gracefully your app?
>>
>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>> to be able to upgrade code & not lose Kafka offsets?
>>
>>
>> Thank you a lot for your answers,
>> Krzysztof Zarzycki
>>
>>
>>
>>
>


Random Forest MLlib

2015-09-10 Thread Yasemin Kaya
Hi ,

I am using Random Forest Alg. for recommendation system. I get users and
users' response yes or no (1/0). But I want to learn the probability of the
trees. Program says x user yes but with how much probability, I want to get
these probabilities.

Best,
yasemin
-- 
hiç ender hiç


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

This is being running in Production in many organization who has adopted
this consumer as an alternative option.  The Consumer will run with spark
1.3.1 .

This is being running in Pearson for sometime in production.

This is part of spark packages and you can see how to include it in your
mvn or sbt .

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

As this consumer comes with in-built PID controller to control
back-pressure which you can use even if you are using Spark 1.3.1


Regards,
Dibyendu


On Thu, Sep 10, 2015 at 5:48 PM, Krzysztof Zarzycki 
wrote:

> Thanks Akhil, seems like an interesting option to consider.
> Do you know if the package is production-ready? Do you use it in
> production?
>
> And do you know if it works for Spark 1.3.1 as well? README mentions that
> package in spark-packages.org is built with Spark 1.4.1.
>
>
> Anyway, it  seems that core Spark Streaming does not support my case? Or
> anyone can instruct me on how to do it? Let's say, that I'm even fine (but
> not content about) with using KafkaCluster private class that is included
> in Spark, for manual managing ZK offsets. Has someone done it before? Has
> someone public code examples of manually managing ZK offsets?
>
> Thanks,
> Krzysztof
>
> 2015-09-10 12:22 GMT+02:00 Akhil Das :
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-10 Thread Dmitry Goldenberg
>> The whole point of checkpointing is to recover the *exact* computation
where it left off.

That makes sense. We were looking at the metadata checkpointing and the
data checkpointing, and with data checkpointing, you can specify a
checkpoint duration value. With the metadata checkpointing, there doesn't
seem to be a way, which may be the intent but it wasn't clear why there's a
way to override one duration (for data) but not the other (for metadata).

The basic feel was that we'd want to minimize the number of times Spark
Streaming is doing the checkpointing I/O. In other words, some sort of
sweet spot value where we do checkpointing frequently enough without
performing I/O too frequently. Finding that sweet spot would mean
experimenting with the checkpoint duration millis but that parameter
doesn't appear to be exposed in case of metadata checkpointing.



On Wed, Sep 9, 2015 at 10:39 PM, Tathagata Das  wrote:

> The whole point of checkpointing is to recover the *exact* computation
> where it left of.
> If you want any change in the specification of the computation (which
> includes any intervals), then you cannot recover from checkpoint as it can
> be an arbitrarily complex issue to deal with changes in the specs,
> especially because a lot of specs are tied to each other (e.g. checkpoint
> interval dictates other things like clean up intervals, etc.)
>
> Why do you need to change the checkpointing interval at the time of
> recovery? Trying to understand your usecase.
>
>
> On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> >> when you use getOrCreate, and there exists a valid checkpoint, it will
>> always return the context from the checkpoint and not call the factory.
>> Simple way to see whats going on is to print something in the factory to
>> verify whether it is ever called.
>>
>> This is probably OK. Seems to explain why we were getting a sticky batch
>> duration millis value. Once I blew away all the checkpointing directories
>> and unplugged the data checkpointing (while keeping the metadata
>> checkpointing) the batch duration millis was no longer sticky.
>>
>> So, there doesn't seem to be a way for metadata checkpointing to override
>> its checkpoint duration millis, is there?  Is the default there
>> max(batchdurationmillis, 10seconds)?  Is there a way to override this?
>> Thanks.
>>
>>
>>
>>
>>
>> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das 
>> wrote:
>>
>>>
>>>
>>> See inline.
>>>
>>> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 What's wrong with creating a checkpointed context??  We WANT
 checkpointing, first of all.  We therefore WANT the checkpointed context.

 Second of all, it's not true that we're loading the checkpointed
 context independent of whether params.isCheckpointed() is true.  I'm
 quoting the code again:

 // This is NOT loading a checkpointed context if isCheckpointed() is
 false.
 JavaStreamingContext jssc = params.isCheckpointed() ?
 createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
 params);

   private JavaStreamingContext createCheckpointedContext(SparkConf
 sparkConf, Parameters params) {
 JavaStreamingContextFactory factory = new
 JavaStreamingContextFactory() {
   @Override
   public JavaStreamingContext create() {
 return createContext(sparkConf, params);
   }
 };
 return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
 factory);*

>>> ^   when you use getOrCreate, and there exists a valid checkpoint,
>>> it will always return the context from the checkpoint and not call the
>>> factory. Simple way to see whats going on is to print something in the
>>> factory to verify whether it is ever called.
>>>
>>>
>>>
>>>
>>>
   }

   private JavaStreamingContext createContext(SparkConf sparkConf,
 Parameters params) {
 // Create context with the specified batch interval, in
 milliseconds.
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
 Durations.milliseconds(params.getBatchDurationMillis()));
 // Set the checkpoint directory, if we're checkpointing
 if (params.isCheckpointed()) {
   jssc.checkpoint(params.getCheckpointDir());

 }
 ...
 Again, this is *only* calling context.checkpoint() if isCheckpointed()
 is true.  And we WANT it to be true.

 What am I missing here?



>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Cody Koeninger
The kafka direct stream meets those requirements.  You don't need
checkpointing for exactly-once.  Indeed, unless your output operations are
idempotent, you can't get exactly-once if you're relying on checkpointing.
Instead, you need to store the offsets atomically in the same transaction
as your results.

See
https://github.com/koeninger/kafka-exactly-once
and the video / blog posts linked from it.

The dibhatt consumer that Akhil linked is using zookeeper to store offsets,
so to the best of my knowledge, it cannot do exactly-once without
idempotent output operations.

Regarding the issues around code changes and checkpointing, the most
straightforward way to deal with this is to just start a new version of
your job before stopping the old one.  If you care about delivery semantics
and are using checkpointing, your output operation must be idempotent
anyway, so having 2 versions of the code running at the same time for a
brief period should not be a problem.



On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg  wrote:

> >> checkpoints can't be used between controlled restarts
>
> Is that true? If so, why? From my testing, checkpoints appear to be
> working fine, we get the data we've missed between the time the consumer
> went down and the time we brought it back up.
>
> >> If I cannot make checkpoints between code upgrades, does it mean that
> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
> that I have to implement my own storing to/initalization of offsets from
> Zookeeper?
>
> By code upgrades, are code changes to the consumer program meant?
>
> If that is the case, one idea we've been entertaining is that, if the
> consumer changes, especially if its configuration parameters change, it
> means that some older configuration may still be stuck in the
> checkpointing.  What we'd do in this case is, prior to starting the
> consumer, blow away the checkpointing directory and re-consume from Kafka
> from the smallest offsets.  In our case, it's OK to re-process; I realize
> that in many cases that may not be an option.  If that's the case then it
> would seem to follow that you have to manage offsets in Zk...
>
> Another thing to consider would be to treat upgrades operationally. In
> that, if an upgrade is to happen, consume the data up to a certain point
> then bring the system down for an upgrade. Remove checkpointing. Restart
> everything; the system would now be rebuilding the checkpointing and using
> your upgraded consumers.  (Again, this may not be possible in some systems
> where the data influx is constant and/or the data is mission critical)...
>
> Perhaps this discussion implies that there may be a new feature in Spark
> where it intelligently drops the checkpointing or allows you to selectively
> pluck out and drop some items prior to restarting...
>
>
>
>
> On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das 
> wrote:
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>


pyspark driver in cluster rather than gateway/client

2015-09-10 Thread roy
Hi,

  Is there any way to make spark driver to run in side YARN containers
rather than gateway/client machine.

  At present even with config parameters --master yarn & --deploy-mode
cluster driver runs on gateway/client machine.

We are on CDH 5.4.1 with YARN and Spark 1.3

any help on this ?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-driver-in-cluster-rather-than-gateway-client-tp24641.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tungsten and Spark Streaming

2015-09-10 Thread Todd Nist
https://issues.apache.org/jira/browse/SPARK-8360?jql=project%20%3D%20SPARK%20AND%20text%20~%20Streaming

-Todd

On Thu, Sep 10, 2015 at 10:22 AM, Gurvinder Singh <
gurvinder.si...@uninett.no> wrote:

> On 09/10/2015 07:42 AM, Tathagata Das wrote:
> > Rewriting is necessary. You will have to convert RDD/DStream operations
> > to DataFrame operations. So get the RDDs in DStream, using
> > transform/foreachRDD, convert to DataFrames and then do DataFrame
> > operations.
>
> Are there any plans for 1.6 or later to add support of tungsten to
> RDD/DStream directly or it is intended that users should switch to
> dataframe rather then operating on RDD/Dstream level.
>
> >
> > On Wed, Sep 9, 2015 at 9:23 PM, N B  > > wrote:
> >
> > Hello,
> >
> > How can we start taking advantage of the performance gains made
> > under Project Tungsten in Spark 1.5 for a Spark Streaming program?
> >
> > From what I understand, this is available by default for Dataframes.
> > But for a program written using Spark Streaming, would we see any
> > potential gains "out of the box" in 1.5 or will we have to rewrite
> > some portions of the application code to realize that benefit?
> >
> > Any insight/documentation links etc in this regard will be
> appreciated.
> >
> > Thanks
> > Nikunj
> >
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Cody Koeninger
You have to store offsets somewhere.

If you're going to store them in checkpoints, then you have to deal with
the fact that checkpoints aren't recoverable on code change.  Starting up
the new version helps because you don't start it from the same checkpoint
directory as the running one... it has your new code, and is storing to a
new checkpoint directory.  If you started the new one from the latest
offsets, you can shut down the old one as soon as it's caught up.

If you don't like the implications of storing offsets in checkpoints...
then sure, store them yourself.  A real database would be better, but if
you really want to store them in zookeeper you can.  In any case, just do
your offset saves in the same foreachPartition your other output operations
are occurring in, after they've successfully completed.

If you don't care about performance benefits of the direct stream and don't
want exactly once semantics, sure use the old stream.

Finally, hundreds of gigs just really isn't very much data.  Unless what
you're doing is really resource intensive, it shouldn't take much time to
process it all, especially if you can dynamically size a cluster for the
rare occasion that something is screwed up and you need to reprocess.


On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki 
wrote:

> Thanks guys for your answers. I put my answers in text, below.
>
> Cheers,
> Krzysztof Zarzycki
>
> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>
>> The kafka direct stream meets those requirements.  You don't need
>> checkpointing for exactly-once.  Indeed, unless your output operations are
>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>> Instead, you need to store the offsets atomically in the same transaction
>> as your results.
>>
>
> To focus discussion, let's assume my operations are idempotent & I'm
> interested in at-least-once thanks to that (which is idempotent
> exactly-once as named in your pres). Did you say, that I don't need
> checkpointing for that? How then direct stream API would store offsets
>  between restarts?
>
>
>> See
>> https://github.com/koeninger/kafka-exactly-once
>> and the video / blog posts linked from it.
>>
>>
> I did that, thank you. What I want is to achieve "idempotent exactly-once"
> as named in your presentation.
>
>
>> The dibhatt consumer that Akhil linked is using zookeeper to store
>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>> idempotent output operations.
>>
> True, and I totally accept it if what I get is at-least-once.
>
>
>>
>>
> Regarding the issues around code changes and checkpointing, the most
>> straightforward way to deal with this is to just start a new version of
>> your job before stopping the old one.  If you care about delivery semantics
>> and are using checkpointing, your output operation must be idempotent
>> anyway, so having 2 versions of the code running at the same time for a
>> brief period should not be a problem.
>>
>
> How starting new version before stopping old one helps? Could you please
> explain a bit the mechanics of that?
> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
> situations when it will be just inapropriate to run old one, when, let's
> say, we discovered a bug and don't want to run it anymore.
>
>
> So... To sum up it correctly, if I want at-least-once, with simple code
> upgrades,  I need to:
> -  store offsets in external storage (I would choose ZK for that).
> -  read them on application restart and pass the TopicAndPartition->offset
> map to createDirectStream.
> -  And I don't need to use checkpoints at all then.
> Could you confirm that?
>
> It's a question where should I actually commit the ZK offsets. The easiest
> would be to do it on the end of every batch. Do you think I can use
> org.apache.spark.streaming.scheduler.StreamingListener, method
> onBatchCompleted for that? I don't think so, because probably we don't have
> access to finieshed offsets in it...
> So maybe each executor can commit the offsets?
>
> Alternative to that solution I just realized is to stay with old Kafka
> receiver (createStream API) and just enable Write Ahead Logs. This way, we
> don't lose any data on application kill, so have "idempotent exactly-once"
> semantics, offsets are stored in ZK for us, don't need to use
> checkpoints... Seems like viable option! Do you agree?
>
>
>
>
>
>
>>
>>
>>
>> On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> >> checkpoints can't be used between controlled restarts
>>>
>>> Is that true? If so, why? From my testing, checkpoints appear to be
>>> working fine, we get the data we've missed between the time the consumer
>>> went down and the time we brought it back up.
>>>
>> I'm sorry I simplified the case. I meant "checkpoints can't be used
> between controlled restarts if you want to upgrade code in between".
>
>>
>>> >> If I cannot make 

Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Shushant Arora
My bad  Got that exception in driver code of same job not in executor.

But it says of socket close exception only.

org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException: Received
-1 when reading from channel, socket has likely been closed.,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([topicname,51], [topicname,201], [topicname,54], [topicname,93],
[topicname,297], [topicname,123], [topicname,147], [topicname,126],
[topicname,189], [topicname,111], [topicname,159], [topicname,33],
[topicname,36], [topicname,60], [topicname,216], [topicname,9],
[topicname,12], [topicname,282], [topicname,39], [topicname,63],
[topicname,231], [topicname,279], [topicname,18], [topicname,30],
[topicname,276], [topicname,228], [topicname,84], [topicname,252],
[topicname,48], [topicname,150], [topicname,132], [topicname,57],
[topicname,72], [topicname,291], [topicname,234], [topicname,204],
[topicname,186], [topicname,264], [topicname,288], [topicname,87],
[topicname,78], [topicname,249], [topicname,102], [topicname,108],
[topicname,237], [topicname,24], [topicname,96], [topicname,135],
[topicname,198], [topicname,162], [topicname,42], [topicname,258],
[topicname,0], [topicname,174], [topicname,207], [topicname,210],
[topicname,246], [topicname,225], [topicname,270], [topicname,156],
[topicname,183], [topicname,144], [topicname,117], [topicname,69],
[topicname,45], [topicname,219], [topicname,177], [topicname,105],
[topicname,171], [topicname,141], [topicname,285], [topicname,27],
[topicname,168], [topicname,267], [topicname,213], [topicname,153],
[topicname,138], [topicname,255], [topicname,222], [topicname,243],
[topicname,261], [topicname,90], [topicname,114], [topicname,3],
[topicname,81], [topicname,180], [topicname,21], [topicname,6],
[topicname,195], [topicname,129], [topicname,192], [topicname,99],
[topicname,294], [topicname,165], [topicname,240], [topicname,66],
[topicname,75], [topicname,15], [topicname,273], [topicname,120]))
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/09/10 02:36:02 ERROR yarn.ApplicationMaster: User class threw exception:
ArrayBuffer(java.io.EOFException: Received -1 when reading from channel,
socket has likely 

Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Cody Koeninger
Again, that looks like you lost a kafka broker.  Executors will retry
failed tasks automatically up to the max failures.

spark.streaming.kafka.maxRetries controls the number of times the driver
will retry when attempting to get offsets.

If your broker isn't up / rebalance hasn't finished after N number of
retries, you've got operational problems you need to deal with.



On Thu, Sep 10, 2015 at 9:58 AM, Shushant Arora 
wrote:

> My bad  Got that exception in driver code of same job not in executor.
>
> But it says of socket close exception only.
>
> org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException:
> Received -1 when reading from channel, socket has likely been closed.,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([topicname,51], [topicname,201], [topicname,54], [topicname,93],
> [topicname,297], [topicname,123], [topicname,147], [topicname,126],
> [topicname,189], [topicname,111], [topicname,159], [topicname,33],
> [topicname,36], [topicname,60], [topicname,216], [topicname,9],
> [topicname,12], [topicname,282], [topicname,39], [topicname,63],
> [topicname,231], [topicname,279], [topicname,18], [topicname,30],
> [topicname,276], [topicname,228], [topicname,84], [topicname,252],
> [topicname,48], [topicname,150], [topicname,132], [topicname,57],
> [topicname,72], [topicname,291], [topicname,234], [topicname,204],
> [topicname,186], [topicname,264], [topicname,288], [topicname,87],
> [topicname,78], [topicname,249], [topicname,102], [topicname,108],
> [topicname,237], [topicname,24], [topicname,96], [topicname,135],
> [topicname,198], [topicname,162], [topicname,42], [topicname,258],
> [topicname,0], [topicname,174], [topicname,207], [topicname,210],
> [topicname,246], [topicname,225], [topicname,270], [topicname,156],
> [topicname,183], [topicname,144], [topicname,117], [topicname,69],
> [topicname,45], [topicname,219], [topicname,177], [topicname,105],
> [topicname,171], [topicname,141], [topicname,285], [topicname,27],
> [topicname,168], [topicname,267], [topicname,213], [topicname,153],
> [topicname,138], [topicname,255], [topicname,222], [topicname,243],
> [topicname,261], [topicname,90], [topicname,114], [topicname,3],
> [topicname,81], [topicname,180], [topicname,21], [topicname,6],
> [topicname,195], [topicname,129], [topicname,192], [topicname,99],
> [topicname,294], [topicname,165], [topicname,240], [topicname,66],
> [topicname,75], [topicname,15], [topicname,273], [topicname,120]))
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at 

Re: How to enable Tungsten in Spark 1.5 for Spark SQL?

2015-09-10 Thread Umesh Kacha
Nice Ted thanks much highest performance without any configuration changes
amazed!  Looking forward to running Spark 1.5 on my 2 tb skewed data which
involves group by union etc any other tips if you know for spark 1.5
On Sep 10, 2015 8:12 PM, "Ted Yu"  wrote:

> Please see the following
> in sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala :
>
>   val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
> defaultValue = Some(true),
> doc = "When true, use the optimized Tungsten physical execution
> backend which explicitly " +
>   "manages memory and dynamically generates bytecode for
> expression evaluation.")
>
>   val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
> defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
> doc = "When true, code will be dynamically generated at runtime for
> expression evaluation in" +
>   " a specific query.",
> isPublic = false)
>
>   val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
> defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
> doc = "When true, use the new optimized Tungsten physical execution
> backend.",
> isPublic = false)
>
> Cheers
>
> On Thu, Sep 10, 2015 at 7:39 AM, unk1102  wrote:
>
>> Hi Spark 1.5 looks promising how do we enable project tungsten for spark
>> sql
>> or is it enabled by default please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-Tungsten-in-Spark-1-5-for-Spark-SQL-tp24642.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to compute the probability of each class in Naive Bayes

2015-09-10 Thread Adamantios Corais
great. so, provided that *model.theta* represents the log-probabilities and
(hence the result of *brzPi + brzTheta * testData.toBreeze* is a big number
too), how can I get back the *non-*log-probabilities which - apparently -
are bounded between *0.0 and 1.0*?



*// Adamantios*



On Tue, Sep 1, 2015 at 12:57 PM, Sean Owen  wrote:

> (pedantic: it's the log-probabilities)
>
> On Tue, Sep 1, 2015 at 10:48 AM, Yanbo Liang  wrote:
> > Actually
> > brzPi + brzTheta * testData.toBreeze
> > is the probabilities of the input Vector on each class, however it's a
> > Breeze Vector.
> > Pay attention the index of this Vector need to map to the corresponding
> > label index.
> >
> > 2015-08-28 20:38 GMT+08:00 Adamantios Corais <
> adamantios.cor...@gmail.com>:
> >>
> >> Hi,
> >>
> >> I am trying to change the following code so as to get the probabilities
> of
> >> the input Vector on each class (instead of the class itself with the
> highest
> >> probability). I know that this is already available as part of the most
> >> recent release of Spark but I have to use Spark 1.1.0.
> >>
> >> Any help is appreciated.
> >>
> >>> override def predict(testData: Vector): Double = {
> >>> labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
> >>>   }
> >>
> >>
> >>>
> >>>
> https://github.com/apache/spark/blob/v1.1.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
> >>
> >>
> >> // Adamantios
> >>
> >>
> >
>


Re: Random Forest MLlib

2015-09-10 Thread Maximo Gurmendez
Hi Yasemin,
   We had the same question and found this:

https://issues.apache.org/jira/browse/SPARK-6884

Thanks,
   Maximo

On Sep 10, 2015, at 9:09 AM, Yasemin Kaya 
> wrote:

Hi ,

I am using Random Forest Alg. for recommendation system. I get users and users' 
response yes or no (1/0). But I want to learn the probability of the trees. 
Program says x user yes but with how much probability, I want to get these 
probabilities.

Best,
yasemin
--
hiç ender hiç



Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Thanks guys for your answers. I put my answers in text, below.

Cheers,
Krzysztof Zarzycki

2015-09-10 15:39 GMT+02:00 Cody Koeninger :

> The kafka direct stream meets those requirements.  You don't need
> checkpointing for exactly-once.  Indeed, unless your output operations are
> idempotent, you can't get exactly-once if you're relying on checkpointing.
> Instead, you need to store the offsets atomically in the same transaction
> as your results.
>

To focus discussion, let's assume my operations are idempotent & I'm
interested in at-least-once thanks to that (which is idempotent
exactly-once as named in your pres). Did you say, that I don't need
checkpointing for that? How then direct stream API would store offsets
 between restarts?


> See
> https://github.com/koeninger/kafka-exactly-once
> and the video / blog posts linked from it.
>
>
I did that, thank you. What I want is to achieve "idempotent exactly-once"
as named in your presentation.


> The dibhatt consumer that Akhil linked is using zookeeper to store
> offsets, so to the best of my knowledge, it cannot do exactly-once without
> idempotent output operations.
>
True, and I totally accept it if what I get is at-least-once.


>
>
Regarding the issues around code changes and checkpointing, the most
> straightforward way to deal with this is to just start a new version of
> your job before stopping the old one.  If you care about delivery semantics
> and are using checkpointing, your output operation must be idempotent
> anyway, so having 2 versions of the code running at the same time for a
> brief period should not be a problem.
>

How starting new version before stopping old one helps? Could you please
explain a bit the mechanics of that?
Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
situations when it will be just inapropriate to run old one, when, let's
say, we discovered a bug and don't want to run it anymore.


So... To sum up it correctly, if I want at-least-once, with simple code
upgrades,  I need to:
-  store offsets in external storage (I would choose ZK for that).
-  read them on application restart and pass the TopicAndPartition->offset
map to createDirectStream.
-  And I don't need to use checkpoints at all then.
Could you confirm that?

It's a question where should I actually commit the ZK offsets. The easiest
would be to do it on the end of every batch. Do you think I can use
org.apache.spark.streaming.scheduler.StreamingListener, method
onBatchCompleted for that? I don't think so, because probably we don't have
access to finieshed offsets in it...
So maybe each executor can commit the offsets?

Alternative to that solution I just realized is to stay with old Kafka
receiver (createStream API) and just enable Write Ahead Logs. This way, we
don't lose any data on application kill, so have "idempotent exactly-once"
semantics, offsets are stored in ZK for us, don't need to use
checkpoints... Seems like viable option! Do you agree?






>
>
>
> On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> >> checkpoints can't be used between controlled restarts
>>
>> Is that true? If so, why? From my testing, checkpoints appear to be
>> working fine, we get the data we've missed between the time the consumer
>> went down and the time we brought it back up.
>>
> I'm sorry I simplified the case. I meant "checkpoints can't be used
between controlled restarts if you want to upgrade code in between".

>
>> >> If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>>
>> By code upgrades, are code changes to the consumer program meant?
>>
> Exactly.

>
>> If that is the case, one idea we've been entertaining is that, if the
>> consumer changes, especially if its configuration parameters change, it
>> means that some older configuration may still be stuck in the
>> checkpointing.  What we'd do in this case is, prior to starting the
>> consumer, blow away the checkpointing directory and re-consume from Kafka
>> from the smallest offsets.  In our case, it's OK to re-process; I realize
>> that in many cases that may not be an option.  If that's the case then it
>> would seem to follow that you have to manage offsets in Zk...
>>
>> Another thing to consider would be to treat upgrades operationally. In
>> that, if an upgrade is to happen, consume the data up to a certain point
>> then bring the system down for an upgrade. Remove checkpointing. Restart
>> everything; the system would now be rebuilding the checkpointing and using
>> your upgraded consumers.  (Again, this may not be possible in some systems
>> where the data influx is constant and/or the data is mission critical)...
>>
> Thanks for your idea, but it is indeed impossible in my case to run kafka
topic from beginning. We keep 

Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Shushant Arora
Stack trace is
15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
partition 99,  sleeping for 200ms
kafka.common.NotLeaderForPartitionException
at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown
Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at
com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
at
com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
consuming messages from kafka




Actual code is :

In driver :
final KafkaStreamTransformations transformations = new
KafkaStreamTransformations
(...);

directKafkaStream.foreachRDD(new Function, Void>() {

@Override
public Void call(JavaRDD v1) throws Exception {
v1.foreachPartition(transformations);
return null;
}
});


In KafkaStreamTransformations :


@Override
public void call(Iterator t) throws Exception {
try{
while(t.hasNext()){
...long running task
}
}catch(Exception e){
e.printStackTrace();
logger.error("Error while consuming messages from kafka");
}






On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger  wrote:

> Post the actual stacktrace you're getting
>
> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Executors in spark streaming 1.3 fetch messages from kafka in batches and
>> what happens when executor takes longer time to complete a fetch batch
>>
>> say in
>>
>>
>> directKafkaStream.foreachRDD(new Function, Void>() {
>>
>> @Override
>> public Void call(JavaRDD v1) throws Exception {
>> v1.foreachPartition(new  VoidFunction>{
>> @Override
>> public void call(Iterator t) throws Exception {
>> //long running task
>> }});}});
>>
>> Will this long running task drops the connectio of executor with kafka
>> brokers-
>> And how to handle that. I am getting Connection tmeout in my code.
>>
>>
>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

Just to clarify one point which may not be clear to many. If someone
 decides to use Receiver based approach , the best options at this point is
to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
work with WAL like any other receiver based consumer. The major issue with
KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
Low Level Kafka Consumer API which does not have any such issue.  I am not
sure if there is any publicly available performance benchmark done with
this one with the DirectStream, so can not comment on performance benefits
of one over other , but whatever performance benchmark we have done,
dibbhatt/kafka-spark-consumer  stands out..

Regards,
Dibyendu

On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger  wrote:

> You have to store offsets somewhere.
>
> If you're going to store them in checkpoints, then you have to deal with
> the fact that checkpoints aren't recoverable on code change.  Starting up
> the new version helps because you don't start it from the same checkpoint
> directory as the running one... it has your new code, and is storing to a
> new checkpoint directory.  If you started the new one from the latest
> offsets, you can shut down the old one as soon as it's caught up.
>
> If you don't like the implications of storing offsets in checkpoints...
> then sure, store them yourself.  A real database would be better, but if
> you really want to store them in zookeeper you can.  In any case, just do
> your offset saves in the same foreachPartition your other output operations
> are occurring in, after they've successfully completed.
>
> If you don't care about performance benefits of the direct stream and
> don't want exactly once semantics, sure use the old stream.
>
> Finally, hundreds of gigs just really isn't very much data.  Unless what
> you're doing is really resource intensive, it shouldn't take much time to
> process it all, especially if you can dynamically size a cluster for the
> rare occasion that something is screwed up and you need to reprocess.
>
>
> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki 
> wrote:
>
>> Thanks guys for your answers. I put my answers in text, below.
>>
>> Cheers,
>> Krzysztof Zarzycki
>>
>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>>
>>> The kafka direct stream meets those requirements.  You don't need
>>> checkpointing for exactly-once.  Indeed, unless your output operations are
>>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>>> Instead, you need to store the offsets atomically in the same transaction
>>> as your results.
>>>
>>
>> To focus discussion, let's assume my operations are idempotent & I'm
>> interested in at-least-once thanks to that (which is idempotent
>> exactly-once as named in your pres). Did you say, that I don't need
>> checkpointing for that? How then direct stream API would store offsets
>>  between restarts?
>>
>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>> and the video / blog posts linked from it.
>>>
>>>
>> I did that, thank you. What I want is to achieve "idempotent
>> exactly-once" as named in your presentation.
>>
>>
>>> The dibhatt consumer that Akhil linked is using zookeeper to store
>>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>>> idempotent output operations.
>>>
>> True, and I totally accept it if what I get is at-least-once.
>>
>>
>>>
>>>
>> Regarding the issues around code changes and checkpointing, the most
>>> straightforward way to deal with this is to just start a new version of
>>> your job before stopping the old one.  If you care about delivery semantics
>>> and are using checkpointing, your output operation must be idempotent
>>> anyway, so having 2 versions of the code running at the same time for a
>>> brief period should not be a problem.
>>>
>>
>> How starting new version before stopping old one helps? Could you please
>> explain a bit the mechanics of that?
>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>> situations when it will be just inapropriate to run old one, when, let's
>> say, we discovered a bug and don't want to run it anymore.
>>
>>
>> So... To sum up it correctly, if I want at-least-once, with simple code
>> upgrades,  I need to:
>> -  store offsets in external storage (I would choose ZK for that).
>> -  read them on application restart and pass the
>> TopicAndPartition->offset map to createDirectStream.
>> -  And I don't need to use checkpoints at all then.
>> Could you confirm that?
>>
>> It's a question where should I actually commit the ZK offsets. The
>> easiest would be to do it on the end of every batch. Do you think I can use
>> org.apache.spark.streaming.scheduler.StreamingListener, method
>> onBatchCompleted for that? I don't think so, because probably we don't have
>> 

Re: How to enable Tungsten in Spark 1.5 for Spark SQL?

2015-09-10 Thread Ted Yu
Please see the following
in sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala :

  val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
defaultValue = Some(true),
doc = "When true, use the optimized Tungsten physical execution backend
which explicitly " +
  "manages memory and dynamically generates bytecode for expression
evaluation.")

  val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
doc = "When true, code will be dynamically generated at runtime for
expression evaluation in" +
  " a specific query.",
isPublic = false)

  val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
doc = "When true, use the new optimized Tungsten physical execution
backend.",
isPublic = false)

Cheers

On Thu, Sep 10, 2015 at 7:39 AM, unk1102  wrote:

> Hi Spark 1.5 looks promising how do we enable project tungsten for spark
> sql
> or is it enabled by default please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-Tungsten-in-Spark-1-5-for-Spark-SQL-tp24642.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Cody Koeninger
NotLeaderForPartitionException means you lost a kafka broker or had a
rebalance... why did you say " I am getting Connection tmeout in my code."

You've asked questions about this exact same situation before, the answer
remains the same

On Thu, Sep 10, 2015 at 9:44 AM, Shushant Arora 
wrote:

> Stack trace is
> 15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
> partition 99,  sleeping for 200ms
> kafka.common.NotLeaderForPartitionException
> at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown
> Source)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:374)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> at
> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
> at
> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
> consuming messages from kafka
>
>
>
>
> Actual code is :
>
> In driver :
> final KafkaStreamTransformations transformations = new
> KafkaStreamTransformations
> (...);
>
> directKafkaStream.foreachRDD(new Function, Void>() {
>
> @Override
> public Void call(JavaRDD v1) throws Exception {
> v1.foreachPartition(transformations);
> return null;
> }
> });
> 
>
> In KafkaStreamTransformations :
>
>
> @Override
> public void call(Iterator t) throws Exception {
> try{
> while(t.hasNext()){
> ...long running task
> }
> }catch(Exception e){
> e.printStackTrace();
> logger.error("Error while consuming messages from kafka");
> }
>
>
>
>
>
>
> On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger 
> wrote:
>
>> Post the actual stacktrace you're getting
>>
>> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Executors in spark streaming 1.3 fetch messages from kafka in batches
>>> and what happens when executor takes longer time to complete a fetch batch
>>>
>>> say in
>>>
>>>
>>> directKafkaStream.foreachRDD(new Function, Void>() {
>>>
>>> @Override
>>> public Void call(JavaRDD v1) throws Exception {
>>> v1.foreachPartition(new  VoidFunction>{
>>> @Override
>>> public void call(Iterator t) throws Exception {
>>> //long running task
>>> }});}});
>>>
>>> Will this long running task drops the connectio of executor with kafka
>>> brokers-
>>> And how to handle that. I am getting Connection tmeout in my code.
>>>
>>>
>>>
>>>
>>
>


Re: Random Forest MLlib

2015-09-10 Thread Yasemin Kaya
Hi Maximo,
Thanks alot..
Hi Yasemin,
   We had the same question and found this:

https://issues.apache.org/jira/browse/SPARK-6884

Thanks,
   Maximo

On Sep 10, 2015, at 9:09 AM, Yasemin Kaya  wrote:

Hi ,

I am using Random Forest Alg. for recommendation system. I get users and
users' response yes or no (1/0). But I want to learn the probability of the
trees. Program says x user yes but with how much probability, I want to get
these probabilities.

Best,
yasemin
-- 
hiç ender hiç


Re: Tungsten and Spark Streaming

2015-09-10 Thread Gurvinder Singh
On 09/10/2015 07:42 AM, Tathagata Das wrote:
> Rewriting is necessary. You will have to convert RDD/DStream operations
> to DataFrame operations. So get the RDDs in DStream, using
> transform/foreachRDD, convert to DataFrames and then do DataFrame
> operations.

Are there any plans for 1.6 or later to add support of tungsten to
RDD/DStream directly or it is intended that users should switch to
dataframe rather then operating on RDD/Dstream level.

> 
> On Wed, Sep 9, 2015 at 9:23 PM, N B  > wrote:
> 
> Hello,
> 
> How can we start taking advantage of the performance gains made
> under Project Tungsten in Spark 1.5 for a Spark Streaming program? 
> 
> From what I understand, this is available by default for Dataframes.
> But for a program written using Spark Streaming, would we see any
> potential gains "out of the box" in 1.5 or will we have to rewrite
> some portions of the application code to realize that benefit?
> 
> Any insight/documentation links etc in this regard will be appreciated.
> 
> Thanks
> Nikunj
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to enable Tungsten in Spark 1.5 for Spark SQL?

2015-09-10 Thread unk1102
Hi Spark 1.5 looks promising how do we enable project tungsten for spark sql
or is it enabled by default please guide. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-Tungsten-in-Spark-1-5-for-Spark-SQL-tp24642.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Thanks Cody for your answers. This discussion helped me a lot.
Though, I still feel that offsets management could be better handled by
Spark, if it really wants to be easy streaming framework. If it won't help
users to do so, I'm affraid it will be superseded for many by other
frameworks that might do it more convenient (Samza, Flink or just
being-designed Kafka-Streams

)

Thanks Dibyendu for your note, I will strongly consider it, when falling
back to receiver-based approach.

Cheers,
Krzysztof Zarzycki

2015-09-10 16:52 GMT+02:00 Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com>:

> Hi,
>
> Just to clarify one point which may not be clear to many. If someone
>  decides to use Receiver based approach , the best options at this point is
> to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
> work with WAL like any other receiver based consumer. The major issue with
> KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
> issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
> Low Level Kafka Consumer API which does not have any such issue.  I am not
> sure if there is any publicly available performance benchmark done with
> this one with the DirectStream, so can not comment on performance benefits
> of one over other , but whatever performance benchmark we have done,
> dibbhatt/kafka-spark-consumer  stands out..
>
> Regards,
> Dibyendu
>
> On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger 
> wrote:
>
>> You have to store offsets somewhere.
>>
>> If you're going to store them in checkpoints, then you have to deal with
>> the fact that checkpoints aren't recoverable on code change.  Starting up
>> the new version helps because you don't start it from the same checkpoint
>> directory as the running one... it has your new code, and is storing to a
>> new checkpoint directory.  If you started the new one from the latest
>> offsets, you can shut down the old one as soon as it's caught up.
>>
>> If you don't like the implications of storing offsets in checkpoints...
>> then sure, store them yourself.  A real database would be better, but if
>> you really want to store them in zookeeper you can.  In any case, just do
>> your offset saves in the same foreachPartition your other output operations
>> are occurring in, after they've successfully completed.
>>
>> If you don't care about performance benefits of the direct stream and
>> don't want exactly once semantics, sure use the old stream.
>>
>> Finally, hundreds of gigs just really isn't very much data.  Unless what
>> you're doing is really resource intensive, it shouldn't take much time to
>> process it all, especially if you can dynamically size a cluster for the
>> rare occasion that something is screwed up and you need to reprocess.
>>
>>
>> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki > > wrote:
>>
>>> Thanks guys for your answers. I put my answers in text, below.
>>>
>>> Cheers,
>>> Krzysztof Zarzycki
>>>
>>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>>>
 The kafka direct stream meets those requirements.  You don't need
 checkpointing for exactly-once.  Indeed, unless your output operations are
 idempotent, you can't get exactly-once if you're relying on checkpointing.
 Instead, you need to store the offsets atomically in the same transaction
 as your results.

>>>
>>> To focus discussion, let's assume my operations are idempotent & I'm
>>> interested in at-least-once thanks to that (which is idempotent
>>> exactly-once as named in your pres). Did you say, that I don't need
>>> checkpointing for that? How then direct stream API would store offsets
>>>  between restarts?
>>>
>>>
 See
 https://github.com/koeninger/kafka-exactly-once
 and the video / blog posts linked from it.


>>> I did that, thank you. What I want is to achieve "idempotent
>>> exactly-once" as named in your presentation.
>>>
>>>
 The dibhatt consumer that Akhil linked is using zookeeper to store
 offsets, so to the best of my knowledge, it cannot do exactly-once without
 idempotent output operations.

>>> True, and I totally accept it if what I get is at-least-once.
>>>
>>>


>>> Regarding the issues around code changes and checkpointing, the most
 straightforward way to deal with this is to just start a new version of
 your job before stopping the old one.  If you care about delivery semantics
 and are using checkpointing, your output operation must be idempotent
 anyway, so having 2 versions of the code running at the same time for a
 brief period should not be a problem.

>>>
>>> How starting new version before stopping old one helps? Could you please
>>> explain a bit the mechanics of that?
>>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>>> 

Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Cody Koeninger
There is no free lunch.  (TD, when do we get shirts made that say that?)

If you want exactly-once delivery semantics for arbitrary workflows into
arbitrary datastores, you're going to have to do some of your own work.

If someone is telling you otherwise, they're probably lying to you.

I think writing your own couple of lines of code to "select offsets from"
 and "update offsets where", for whatever your data store may be, is worth
the flexibility of doing exactly what you need.


On Thu, Sep 10, 2015 at 10:01 AM, Krzysztof Zarzycki 
wrote:

> Thanks Cody for your answers. This discussion helped me a lot.
> Though, I still feel that offsets management could be better handled by
> Spark, if it really wants to be easy streaming framework. If it won't help
> users to do so, I'm affraid it will be superseded for many by other
> frameworks that might do it more convenient (Samza, Flink or just
> being-designed Kafka-Streams
> 
> )
>
> Thanks Dibyendu for your note, I will strongly consider it, when falling
> back to receiver-based approach.
>
> Cheers,
> Krzysztof Zarzycki
>
> 2015-09-10 16:52 GMT+02:00 Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com>:
>
>> Hi,
>>
>> Just to clarify one point which may not be clear to many. If someone
>>  decides to use Receiver based approach , the best options at this point is
>> to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
>> work with WAL like any other receiver based consumer. The major issue with
>> KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
>> issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
>> Low Level Kafka Consumer API which does not have any such issue.  I am not
>> sure if there is any publicly available performance benchmark done with
>> this one with the DirectStream, so can not comment on performance benefits
>> of one over other , but whatever performance benchmark we have done,
>> dibbhatt/kafka-spark-consumer  stands out..
>>
>> Regards,
>> Dibyendu
>>
>> On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger 
>> wrote:
>>
>>> You have to store offsets somewhere.
>>>
>>> If you're going to store them in checkpoints, then you have to deal with
>>> the fact that checkpoints aren't recoverable on code change.  Starting up
>>> the new version helps because you don't start it from the same checkpoint
>>> directory as the running one... it has your new code, and is storing to a
>>> new checkpoint directory.  If you started the new one from the latest
>>> offsets, you can shut down the old one as soon as it's caught up.
>>>
>>> If you don't like the implications of storing offsets in checkpoints...
>>> then sure, store them yourself.  A real database would be better, but if
>>> you really want to store them in zookeeper you can.  In any case, just do
>>> your offset saves in the same foreachPartition your other output operations
>>> are occurring in, after they've successfully completed.
>>>
>>> If you don't care about performance benefits of the direct stream and
>>> don't want exactly once semantics, sure use the old stream.
>>>
>>> Finally, hundreds of gigs just really isn't very much data.  Unless what
>>> you're doing is really resource intensive, it shouldn't take much time to
>>> process it all, especially if you can dynamically size a cluster for the
>>> rare occasion that something is screwed up and you need to reprocess.
>>>
>>>
>>> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki <
>>> k.zarzy...@gmail.com> wrote:
>>>
 Thanks guys for your answers. I put my answers in text, below.

 Cheers,
 Krzysztof Zarzycki

 2015-09-10 15:39 GMT+02:00 Cody Koeninger :

> The kafka direct stream meets those requirements.  You don't need
> checkpointing for exactly-once.  Indeed, unless your output operations are
> idempotent, you can't get exactly-once if you're relying on checkpointing.
> Instead, you need to store the offsets atomically in the same transaction
> as your results.
>

 To focus discussion, let's assume my operations are idempotent & I'm
 interested in at-least-once thanks to that (which is idempotent
 exactly-once as named in your pres). Did you say, that I don't need
 checkpointing for that? How then direct stream API would store offsets
  between restarts?


> See
> https://github.com/koeninger/kafka-exactly-once
> and the video / blog posts linked from it.
>
>
 I did that, thank you. What I want is to achieve "idempotent
 exactly-once" as named in your presentation.


> The dibhatt consumer that Akhil linked is using zookeeper to store
> offsets, so to the best of my knowledge, it cannot do exactly-once without
> idempotent output operations.
>
 True, and I totally accept it if 

Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-10 Thread Ashish Shenoy
I am using spark-1.4.1

Here's the skeleton code:

JavaPairRDD rddPair =
  rdd.repartitionAndSortWithinPartitions(
  new CustomPartitioner(), new ExportObjectComparator())
.persist(StorageLevel.MEMORY_AND_DISK_SER());

...

@SuppressWarnings("serial")
private static class CustomPartitioner extends Partitioner {
  int numPartitions;
  @Override
  public int numPartitions() {
numPartitions = 40;
return numPartitions;
  }

  @Override
  public int getPartition(Object o) {
NewKey newKey = (NewKey) o;
return (int) newKey.getGsMinusURL() % numPartitions;
  }
}

...

@SuppressWarnings("serial")
private static class ExportObjectComparator
  implements Serializable, Comparator {
  @Override
  public int compare(NewKey o1, NewKey o2) {
if (o1.hits == o2.hits) {
  return 0;
} else if (o1.hits > o2.hits) {
  return -1;
} else {
  return 1;
}
  }

}

...



Thanks,
Ashish

On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu  wrote:

> Which release of Spark are you using ?
>
> Can you show skeleton of your partitioner and comparator ?
>
> Thanks
>
>
>
> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy 
> wrote:
>
> Hi,
>
> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions()
> for my key [which is a custom class, not a java primitive] using a custom
> partitioner on that key and a custom comparator. However, it fails
> consistently:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 18
> in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage
> 1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> I also persist the RDD using the "memory and disk" storage level. The
> stack trace above comes from spark's code and not my application code. Can
> you pls point out what I am doing wrong ?
>
> Thanks,
> Ashish
>
>


Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-10 Thread Work
Ewan,


What issue are you having with HDFS when only Spark is installed? I'm not aware 
of any issue like this.




Thanks,

 Jonathan





—
Sent from Mailbox

On Wed, Sep 9, 2015 at 11:48 PM, Ewan Leith 
wrote:

> The last time I checked, if you launch EMR 4 with only Spark selected as an 
> application, HDFS isn't correctly installed.
> Did you select another application like Hive at launch time as well as Spark? 
> If not, try that.
> Thanks,
> Ewan
> -- Original message--
> From: Dean Wampler
> Date: Wed, 9 Sep 2015 22:29
> To: shahab;
> Cc: user@spark.apache.org;
> Subject:Re: [Spark on Amazon EMR] : File does not exist: 
> hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
> If you log into the cluster, do you see the file if you type:
> hdfs dfs -ls 
> hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
> (with the correct server address for "ipx-x-x-x"). If not, is the server 
> address correct and routable inside the cluster. Recall that EC2 instances 
> have both public and private host names & IP addresses.
> Also, is the port number correct for HDFS in the cluster?
> dean
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd 
> Edition (O'Reilly)
> Typesafe
> @deanwampler
> http://polyglotprogramming.com
> On Wed, Sep 9, 2015 at 9:28 AM, shahab 
> > wrote:
> Hi,
> I am using Spark on Amazon EMR. So far I have not succeeded to submit the 
> application successfully, not sure what's problem. In the log file I see the 
> followings.
> java.io.FileNotFoundException: File does not exist: 
> hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
> However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the fat 
> jar file didn't solve the problem. I am out of clue now.
> I want to submit a spark application, using aws web console, as a step. I 
> submit the application as : spark-submit --deploy-mode cluster --class 
> mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is 
> there any one who has similar problem with EMR?
> best,
> /Shahab

Re: Spark-shell throws Hive error when SQLContext.parquetFile, v1.3

2015-09-10 Thread Mohammad Islam
In addition to Cheng's comment --
I found the similar problem when hive-site.xml is not in the class path. A 
proper stack trace can pinpoint the problem.

In the mean time, you can add it into your environment through 
HADOOP_CLASSPATH. (export HADOOP_CONF_DIR=/etc/hive/conf/)  
See more at 
http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_rn_spark_ki.html
and look for "Spark not automatically picking up hive-site.xml".


On Thursday, September 10, 2015 5:01 AM, Cheng Lian  
wrote:



If you don't need to interact with Hive, you may compile Spark without 
using the -Phive flag to eliminate Hive dependencies. In this way, the 
sqlContext instance in Spark shell will be of type SQLContext instead of 
HiveContext.

The reason behind the Hive metastore error is probably due to Hive 
misconfiguration.

Cheng


On 9/10/15 6:02 PM, Petr Novak wrote:
> Hello,
>
> sqlContext.parquetFile(dir)
>
> throws exception " Unable to instantiate 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient"
>
> The strange thing is that on the second attempt to open the file it is 
> successful:
>
> try {
> sqlContext.parquetFile(dir)
>   } catch {
> case e: Exception => sqlContext.parquetFile(dir)
> }
>
> What should I do to make my script to run flawlessly in spark-shell 
> when opening parquetFiles. It is probably missing some dependency. Or 
> how should I write the code because this double attempt is awfull and 
> I don't need HiveMetaStoreClient, I just need to open parquet file.
>
> Many thanks for any idea,
> Petr
>
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark task hangs infinitely when accessing S3

2015-09-10 Thread Mario Pastorelli
Dear community,
I am facing a problem accessing data on S3 via Spark. My current
configuration is the following:

- Spark 1.4.1
- Hadoop 2.7.1
- hadoop-aws-2.7.1
- mesos 0.22.1

I am accessing the data using the s3a protocol but it just hangs. The job
runs through the whole data set but
systematically there is one tasks never finishing. In the stderr I am
reading
quite some timeout errors but it looks like the application is recovering
from these. It is just infinitely running without proceeding to the next
stage.

This is the stacktrace I am reading from the errors that the job is
recovering from:

java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInpu
tStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
at
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at
org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at
com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at
org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:164)
at java.io.DataInputStream.read(DataInputStream.java:149)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:949)
at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:506)
at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:335)
at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:425)
at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:485)
at java.io.InputStream.read(InputStream.java:101)
at
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)

My gut feeling is that the job is "failing at failing". It looks like some
tasks that should be failing, unfortunately are not. This seems to not
happen and, thus, the job just hangs forever. Moreover, debugging this
problem is really hard because there is no concrete error in the logs.

Could you help me figuring out what is happening and trying to find a
solution to this issue?
Thank you!


Re: How to compute the probability of each class in Naive Bayes

2015-09-10 Thread Sean Owen
Yes, 
https://github.com/apache/spark/blob/v1.5.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala#L158
is the method you are interested in. It does normalize the
probabilities and return them to non-log-space. So you can use
predictProbabilities to get the actual posterior class probabilities
for a given input:
https://github.com/apache/spark/blob/v1.5.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala#L130

On Thu, Sep 10, 2015 at 6:32 PM, Adamantios Corais
 wrote:
> Thanks Sean. As far as I can see probabilities are NOT normalized;
> denominator isn't implemented in either v1.1.0 or v1.5.0 (by denominator, I
> refer to the probability of feature X). So, for given lambda, how to compute
> the denominator? FYI:
> https://github.com/apache/spark/blob/v1.5.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
>
> // Adamantios
>
>
>
> On Thu, Sep 10, 2015 at 7:03 PM, Sean Owen  wrote:
>>
>> The log probabilities are unlikely to be very large, though the
>> probabilities may be very small. The direct answer is to exponentiate
>> brzPi + brzTheta * testData.toBreeze -- apply exp(x).
>>
>> I have forgotten whether the probabilities are normalized already
>> though. If not you'll have to normalize to get them to sum to 1 and be
>> real class probabilities. This is better done in log space though.
>>
>> On Thu, Sep 10, 2015 at 5:12 PM, Adamantios Corais
>>  wrote:
>> > great. so, provided that model.theta represents the log-probabilities
>> > and
>> > (hence the result of brzPi + brzTheta * testData.toBreeze is a big
>> > number
>> > too), how can I get back the non-log-probabilities which - apparently -
>> > are
>> > bounded between 0.0 and 1.0?
>> >
>> >
>> > // Adamantios
>> >
>> >
>> >
>> > On Tue, Sep 1, 2015 at 12:57 PM, Sean Owen  wrote:
>> >>
>> >> (pedantic: it's the log-probabilities)
>> >>
>> >> On Tue, Sep 1, 2015 at 10:48 AM, Yanbo Liang 
>> >> wrote:
>> >> > Actually
>> >> > brzPi + brzTheta * testData.toBreeze
>> >> > is the probabilities of the input Vector on each class, however it's
>> >> > a
>> >> > Breeze Vector.
>> >> > Pay attention the index of this Vector need to map to the
>> >> > corresponding
>> >> > label index.
>> >> >
>> >> > 2015-08-28 20:38 GMT+08:00 Adamantios Corais
>> >> > :
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I am trying to change the following code so as to get the
>> >> >> probabilities
>> >> >> of
>> >> >> the input Vector on each class (instead of the class itself with the
>> >> >> highest
>> >> >> probability). I know that this is already available as part of the
>> >> >> most
>> >> >> recent release of Spark but I have to use Spark 1.1.0.
>> >> >>
>> >> >> Any help is appreciated.
>> >> >>
>> >> >>> override def predict(testData: Vector): Double = {
>> >> >>> labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
>> >> >>>   }
>> >> >>
>> >> >>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> https://github.com/apache/spark/blob/v1.1.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
>> >> >>
>> >> >>
>> >> >> // Adamantios
>> >> >>
>> >> >>
>> >> >
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to compute the probability of each class in Naive Bayes

2015-09-10 Thread Sean Owen
The log probabilities are unlikely to be very large, though the
probabilities may be very small. The direct answer is to exponentiate
brzPi + brzTheta * testData.toBreeze -- apply exp(x).

I have forgotten whether the probabilities are normalized already
though. If not you'll have to normalize to get them to sum to 1 and be
real class probabilities. This is better done in log space though.

On Thu, Sep 10, 2015 at 5:12 PM, Adamantios Corais
 wrote:
> great. so, provided that model.theta represents the log-probabilities and
> (hence the result of brzPi + brzTheta * testData.toBreeze is a big number
> too), how can I get back the non-log-probabilities which - apparently - are
> bounded between 0.0 and 1.0?
>
>
> // Adamantios
>
>
>
> On Tue, Sep 1, 2015 at 12:57 PM, Sean Owen  wrote:
>>
>> (pedantic: it's the log-probabilities)
>>
>> On Tue, Sep 1, 2015 at 10:48 AM, Yanbo Liang  wrote:
>> > Actually
>> > brzPi + brzTheta * testData.toBreeze
>> > is the probabilities of the input Vector on each class, however it's a
>> > Breeze Vector.
>> > Pay attention the index of this Vector need to map to the corresponding
>> > label index.
>> >
>> > 2015-08-28 20:38 GMT+08:00 Adamantios Corais
>> > :
>> >>
>> >> Hi,
>> >>
>> >> I am trying to change the following code so as to get the probabilities
>> >> of
>> >> the input Vector on each class (instead of the class itself with the
>> >> highest
>> >> probability). I know that this is already available as part of the most
>> >> recent release of Spark but I have to use Spark 1.1.0.
>> >>
>> >> Any help is appreciated.
>> >>
>> >>> override def predict(testData: Vector): Double = {
>> >>> labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
>> >>>   }
>> >>
>> >>
>> >>>
>> >>>
>> >>> https://github.com/apache/spark/blob/v1.1.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
>> >>
>> >>
>> >> // Adamantios
>> >>
>> >>
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to compute the probability of each class in Naive Bayes

2015-09-10 Thread Adamantios Corais
Thanks Sean. As far as I can see probabilities are NOT normalized;
denominator isn't implemented in either v1.1.0 or v1.5.0 (by denominator,
I refer to the probability of feature X). So, for given lambda, how to
compute the denominator? FYI:
https://github.com/apache/spark/blob/v1.5.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala


*// Adamantios*



On Thu, Sep 10, 2015 at 7:03 PM, Sean Owen  wrote:

> The log probabilities are unlikely to be very large, though the
> probabilities may be very small. The direct answer is to exponentiate
> brzPi + brzTheta * testData.toBreeze -- apply exp(x).
>
> I have forgotten whether the probabilities are normalized already
> though. If not you'll have to normalize to get them to sum to 1 and be
> real class probabilities. This is better done in log space though.
>
> On Thu, Sep 10, 2015 at 5:12 PM, Adamantios Corais
>  wrote:
> > great. so, provided that model.theta represents the log-probabilities and
> > (hence the result of brzPi + brzTheta * testData.toBreeze is a big number
> > too), how can I get back the non-log-probabilities which - apparently -
> are
> > bounded between 0.0 and 1.0?
> >
> >
> > // Adamantios
> >
> >
> >
> > On Tue, Sep 1, 2015 at 12:57 PM, Sean Owen  wrote:
> >>
> >> (pedantic: it's the log-probabilities)
> >>
> >> On Tue, Sep 1, 2015 at 10:48 AM, Yanbo Liang 
> wrote:
> >> > Actually
> >> > brzPi + brzTheta * testData.toBreeze
> >> > is the probabilities of the input Vector on each class, however it's a
> >> > Breeze Vector.
> >> > Pay attention the index of this Vector need to map to the
> corresponding
> >> > label index.
> >> >
> >> > 2015-08-28 20:38 GMT+08:00 Adamantios Corais
> >> > :
> >> >>
> >> >> Hi,
> >> >>
> >> >> I am trying to change the following code so as to get the
> probabilities
> >> >> of
> >> >> the input Vector on each class (instead of the class itself with the
> >> >> highest
> >> >> probability). I know that this is already available as part of the
> most
> >> >> recent release of Spark but I have to use Spark 1.1.0.
> >> >>
> >> >> Any help is appreciated.
> >> >>
> >> >>> override def predict(testData: Vector): Double = {
> >> >>> labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
> >> >>>   }
> >> >>
> >> >>
> >> >>>
> >> >>>
> >> >>>
> https://github.com/apache/spark/blob/v1.1.0/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
> >> >>
> >> >>
> >> >> // Adamantios
> >> >>
> >> >>
> >> >
> >
> >
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-10 Thread Tim Chen
Hi Tom,

Sorry the documentation isn't really rich, since it's probably assuming
users understands how Mesos and framework works.

First I need explain the rationale of why create the dispatcher. If you're
not familiar with Mesos yet, each node in your datacenter is installed a
Mesos slave where it's responsible for publishing resources and
running/watching tasks, and Mesos master is responsible for taking the
aggregated resources and scheduling them among frameworks.

Frameworks are not managed by Mesos, as Mesos master/slave doesn't launch
and maintain framework but assume they're launched and kept running on its
own. All the existing frameworks in the ecosystem therefore all have their
own ways to deploy, HA and persist state (e.g: Aurora, Marathon, etc).

Therefore, to introduce cluster mode with Mesos, we must create a framework
that is long running that can be running in your datacenter, and can handle
launching spark drivers on demand and handle HA, etc. This is what the
dispatcher is all about.

So the idea is that you should launch the dispatcher not on the client, but
on a machine in your datacenter. In Mesosphere's DCOS we launch all
frameworks and long running services with Marathon, and you can use
Marathon to launch the Spark dispatcher.

Then all clients instead of specifying the Mesos master URL (e.g:
mesos://mesos.master:2181), then just talks to the dispatcher only
(mesos://spark-dispatcher.mesos:7077), and the dispatcher will then start
and watch the driver for you.

Tim



On Thu, Sep 10, 2015 at 10:13 AM, Tom Waterhouse (tomwater) <
tomwa...@cisco.com> wrote:

> After spending most of yesterday scouring the Internet for sources of
> documentation for submitting Spark jobs in cluster mode to a Spark cluster
> managed by Mesos I was able to do just that, but I am not convinced that
> how I have things setup is correct.
>
> I used the Mesos published
> 
> instructions for setting up my Mesos cluster.  I have three Zookeeper
> instances, three Mesos master instances, and three Mesos slave instances.
> This is all running in Openstack.
>
> The documentation on the Spark documentation site states that “To use
> cluster mode, you must start the MesosClusterDispatcher in your cluster via
> the sbin/start-mesos-dispatcher.sh script, passing in the Mesos master
> url (e.g: mesos://host:5050).”  That is it, no more information than
> that.  So that is what I did: I have one machine that I use as the Spark
> client for submitting jobs.  I started the Mesos dispatcher with script as
> described, and using the client machine’s IP address and port as the target
> for the job submitted the job.
>
> The job is currently running in Mesos as expected.  This is not however
> how I would have expected to configure the system.  As running there is one
> instance of the Spark Mesos dispatcher running outside of Mesos, so not a
> part of the sphere of Mesos resource management.
>
> I used the following Stack Overflow posts as guidelines:
> http://stackoverflow.com/questions/31164725/spark-mesos-dispatcher
> http://stackoverflow.com/questions/31294515/start-spark-via-mesos
>
> There must be better documentation on how to deploy Spark in Mesos with
> jobs able to be deployed in cluster mode.
>
> I can follow up with more specific information regarding my deployment if
> necessary.
>
> Tom
>


Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-10 Thread Tom Waterhouse (tomwater)
After spending most of yesterday scouring the Internet for sources of 
documentation for submitting Spark jobs in cluster mode to a Spark cluster 
managed by Mesos I was able to do just that, but I am not convinced that how I 
have things setup is correct.

I used the Mesos 
published 
instructions for setting up my Mesos cluster.  I have three Zookeeper 
instances, three Mesos master instances, and three Mesos slave instances.  This 
is all running in Openstack.

The documentation on the Spark documentation site states that “To use cluster 
mode, you must start the MesosClusterDispatcher in your cluster via the 
sbin/start-mesos-dispatcher.sh script, passing in the Mesos master url (e.g: 
mesos://host:5050).”  That is it, no more information than that.  So that is 
what I did: I have one machine that I use as the Spark client for submitting 
jobs.  I started the Mesos dispatcher with script as described, and using the 
client machine’s IP address and port as the target for the job submitted the 
job.

The job is currently running in Mesos as expected.  This is not however how I 
would have expected to configure the system.  As running there is one instance 
of the Spark Mesos dispatcher running outside of Mesos, so not a part of the 
sphere of Mesos resource management.

I used the following Stack Overflow posts as guidelines:
http://stackoverflow.com/questions/31164725/spark-mesos-dispatcher
http://stackoverflow.com/questions/31294515/start-spark-via-mesos

There must be better documentation on how to deploy Spark in Mesos with jobs 
able to be deployed in cluster mode.

I can follow up with more specific information regarding my deployment if 
necessary.

Tom


Re: Custom UDAF Evaluated Over Window

2015-09-10 Thread Michael Armbrust
The only way to do this today is to write it as a Hive UDAF.  We hope to
improve the window functions to use our native aggregation in a future
release.

On Thu, Sep 10, 2015 at 12:26 AM, xander92 
wrote:

> While testing out the new UserDefinedAggregateFunction in Spark 1.5.0, I
> successfully implemented a simple function to compute an average. I then
> tried to test this function by applying it over a simple window and I got
> an
> error saying that my function is not supported over window operation.
>
> So, is applying custom UDAFs over windows possible in Spark 1.5.0 and I
> simply have a mistake somewhere?. If it is not possible, are there patches
> that make this sort of thing possible that are simply not included in the
> new release or is this functionality something that will hopefully come
> soon
> in a later release?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDAF-Evaluated-Over-Window-tp24637.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Creating Parquet external table using HiveContext API

2015-09-10 Thread Michael Armbrust
Easiest is to just use SQL:

hiveContext.sql("CREATE TABLE  USING parquet OPTIONS (path
'')")

When you specify the path its automatically created as an external table.
The schema will be discovered.

On Wed, Sep 9, 2015 at 9:33 PM, Mohammad Islam 
wrote:

> Hi,
> I want to create  an external hive table using HiveContext. I have the
> following :
> 1. full path/location of parquet data directory
> 2. name of the new table
> 3. I can get the schema as well.
>
> What API will be the best (for 1,3.x or 1.4.x)? I can see 6
> createExternalTable() APIs but not sure which one will be the best.
> I didn't find any good documentation in source code or Java doc about the
> parameters of the APIs (i.e path, source, options etc);
>
> Any help will be appreciated.
>
>
> Regards,
> Mohammad
>
>


Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-10 Thread Ted Yu
Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException
was thrown:

while (iterator.hasNext) {
  val partitionId = iterator.nextPartition()
  iterator.writeNext(partitionWriters(partitionId))
}
Meaning, partitionId was negative.
Execute the following and examine the value of i:

int i = -78 % 40;

You will see how your getPartition() method should be refined to prevent
this exception.

On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy 
wrote:

> I am using spark-1.4.1
>
> Here's the skeleton code:
>
> JavaPairRDD rddPair =
>   rdd.repartitionAndSortWithinPartitions(
>   new CustomPartitioner(), new ExportObjectComparator())
> .persist(StorageLevel.MEMORY_AND_DISK_SER());
>
> ...
>
> @SuppressWarnings("serial")
> private static class CustomPartitioner extends Partitioner {
>   int numPartitions;
>   @Override
>   public int numPartitions() {
> numPartitions = 40;
> return numPartitions;
>   }
>
>   @Override
>   public int getPartition(Object o) {
> NewKey newKey = (NewKey) o;
> return (int) newKey.getGsMinusURL() % numPartitions;
>   }
> }
>
> ...
>
> @SuppressWarnings("serial")
> private static class ExportObjectComparator
>   implements Serializable, Comparator {
>   @Override
>   public int compare(NewKey o1, NewKey o2) {
> if (o1.hits == o2.hits) {
>   return 0;
> } else if (o1.hits > o2.hits) {
>   return -1;
> } else {
>   return 1;
> }
>   }
>
> }
>
> ...
>
>
>
> Thanks,
> Ashish
>
> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu  wrote:
>
>> Which release of Spark are you using ?
>>
>> Can you show skeleton of your partitioner and comparator ?
>>
>> Thanks
>>
>>
>>
>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy 
>> wrote:
>>
>> Hi,
>>
>> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions()
>> for my key [which is a custom class, not a java primitive] using a custom
>> partitioner on that key and a custom comparator. However, it fails
>> consistently:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in
>> stage 1.0 (TID 202, 172.16.18.25):
>> java.lang.ArrayIndexOutOfBoundsException: -78
>> at
>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
>> at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I also persist the RDD using the "memory and disk" storage level. The
>> stack trace above comes from spark's code and not my application code. Can
>> you pls point out what I am doing wrong ?
>>
>> Thanks,
>> Ashish
>>
>>
>


connecting to remote spark and reading files on HDFS or s3 in sparkR

2015-09-10 Thread roni
I have spark installed on a EC2 cluster. Can I connect to that from my
local sparkR in RStudio? if yes , how ?

Can I read files  which I have saved as parquet files on hdfs  or s3 in
sparkR ? If yes , How?

Thanks
-Roni


reading files on HDFS /s3 in sparkR -failing

2015-09-10 Thread roni
I am trying this -

 ddf <- parquetFile(sqlContext,  "hdfs://
ec2-52-26-180-130.us-west-2.compute.amazonaws.com:9000/IPF_14_1.parquet")

and I get path[1]="hdfs://
ec2-52-26-180-130.us-west-2.compute.amazonaws.com:9000/IPF_14_1.parquet":
No such file or directory


when I read file on s3 , I get -  java.io.IOException: No FileSystem for
scheme: s3


Thanks in advance.

-Roni


Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Michael Armbrust
I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3,
so this is surprising.  In my experiments Spark 1.5 is either the same or
faster than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd  wrote:

> Hi,
>
> I am using data generated with sparksqlperf(
> https://github.com/databricks/spark-sql-perf) to test the spark sql
> performance (spark on yarn, with 10 nodes) with the following code (The
> table store_sales is about 90 million records, 6G in size)
>
> val
> outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
> val name="store_sales"
> sqlContext.sql(
>   s"""
>   |CREATE TEMPORARY TABLE ${name}
>   |USING org.apache.spark.sql.parquet
>   |OPTIONS (
>   |  path '${outputDir}'
>   |)
> """.stripMargin)
>
> val sql="""
>  |select
>  |  t1.ss_quantity,
>  |  t1.ss_list_price,
>  |  t1.ss_coupon_amt,
>  |  t1.ss_cdemo_sk,
>  |  t1.ss_item_sk,
>  |  t1.ss_promo_sk,
>  |  t1.ss_sold_date_sk
>  |from store_sales t1 join store_sales t2 on t1.ss_item_sk =
> t2.ss_item_sk
>  |where
>  |  t1.ss_sold_date_sk between 2450815 and 2451179
>""".stripMargin
>
> val df = sqlContext.sql(sql)
> df.rdd.foreach(row=>Unit)
>
> With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes
> with 1.5.
>
> The configuration are basically the same, since I copy the configuration
> from 1.4.1 to 1.5:
>
> sparkVersion1.4.11.5.0
> scaleFactor3030
> spark.sql.shuffle.partitions600600
> spark.sql.sources.partitionDiscovery.enabledtruetrue
> spark.default.parallelism200200
> spark.driver.memory4G4G4G
> spark.executor.memory4G4G
> spark.executor.instances1010
> spark.shuffle.consolidateFilestruetrue
> spark.storage.memoryFraction0.40.4
> spark.executor.cores33
>
> I am not sure where is going wrong,any ideas?
>
>
>


Re: Avoiding SQL Injection in Spark SQL

2015-09-10 Thread Michael Armbrust
Either that or use the DataFrame API, which directly constructs query plans
and thus doesn't suffer from injection attacks (and runs on the same
execution engine).

On Thu, Sep 10, 2015 at 12:10 AM, Sean Owen  wrote:

> I don't think this is Spark-specific. Mostly you need to escape /
> quote user-supplied values as with any SQL engine.
>
> On Thu, Sep 10, 2015 at 7:32 AM, V Dineshkumar
>  wrote:
> > Hi,
> >
> > What is the preferred way of avoiding SQL Injection while using Spark
> SQL?
> > In our use case we have to take the parameters directly from the users
> and
> > prepare the SQL Statement.I was not able to find any API for preparing
> the
> > SQL statement safely avoiding injection.
> >
> > Thanks,
> > Dinesh
> > Philips India
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Creating Parquet external table using HiveContext API

2015-09-10 Thread Mohammad Islam
Thanks a lot Michael for giving a solution.
If I want to provide my own schema, can I do that?
 


 On Thursday, September 10, 2015 11:05 AM, Michael Armbrust 
 wrote:
   

 Easiest is to just use SQL:
hiveContext.sql("CREATE TABLE  USING parquet OPTIONS (path 
'')")
When you specify the path its automatically created as an external table.  The 
schema will be discovered.
On Wed, Sep 9, 2015 at 9:33 PM, Mohammad Islam  
wrote:

Hi,I want to create  an external hive table using HiveContext. I have the 
following :1. full path/location of parquet data directory2. name of the new 
table3. I can get the schema as well.
What API will be the best (for 1,3.x or 1.4.x)? I can see 6 
createExternalTable() APIs but not sure which one will be the best.I didn't 
find any good documentation in source code or Java doc about the parameters of 
the APIs (i.e path, source, options etc); Any help will be appreciated.

Regards,Mohammad




  

Re: Best way to import data from Oracle to Spark?

2015-09-10 Thread Ruslan Dautkhanov
Sathish,

Thanks for pointing to that.

https://docs.oracle.com/cd/E57371_01/doc.41/e57351/copy2bda.htm

That must be only part of Oracle's BDA codebase, not open-source Hive,
right?



-- 
Ruslan Dautkhanov

On Thu, Sep 10, 2015 at 6:59 AM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> I guess data pump export from Oracle could be fast option. Hive now has
> oracle data pump serde..
>
> https://docs.oracle.com/cd/E57371_01/doc.41/e57351/copy2bda.htm
>
>
>
> On Wed, Sep 9, 2015 at 4:41 AM Reynold Xin  wrote:
>
>> Using the JDBC data source is probably the best way.
>> http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#jdbc-to-other-databases
>>
>> On Tue, Sep 8, 2015 at 10:11 AM, Cui Lin  wrote:
>>
>>> What's the best way to import data from Oracle to Spark? Thanks!
>>>
>>>
>>> --
>>> Best regards!
>>>
>>> Lin,Cui
>>>
>>
>>


Spark UI keeps redirecting to /null and returns 500

2015-09-10 Thread Rajeev Prasad
I am having problem in accessing spark UI while running in spark-client
mode. It works fine in local mode.

It keeps redirecting back to itself by adding /null at the end and
ultimately run out of size limit for url and returns 500. Look at response
below.

I have a feeling that I might be missing some config, I played with various
config settings for yarn with no success.

I am using spark version 1.3.1

Any help will be greatly appreciated.


--2015-09-09 11:22:17--  http://192.168.13.37:4040/

Connecting to 192.168.13.37:4040... connected.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/ [following]

--2015-09-09 11:22:17--  http://192.168.13.37:4040/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/null/null/ [following]

--2015-09-09 11:22:17--  http://192.168.13.37:4040/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/
 [following]

--2015-09-09 11:22:17--
http://192.168.13.37:4040/null/null/null/null/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location:
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location:
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/
 [following]

--2015-09-09 11:22:17--
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

Here is stack dump:

15/09/09 11:22:18 WARN server.Response: Committed before 500 null

15/09/09 11:22:18 WARN server.AbstractHttpConnection:

Re: Spark on Yarn vs Standalone

2015-09-10 Thread Sandy Ryza
YARN will never kill processes for being unresponsive.

It may kill processes for occupying more memory than it allows.  To get
around this, you can either bump spark.yarn.executor.memoryOverhead or turn
off the memory checks entirely with yarn.nodemanager.pmem-check-enabled.

-Sandy

On Tue, Sep 8, 2015 at 10:48 PM, Alexander Pivovarov 
wrote:

> The problem which we have now is skew data (2360 tasks done in 5 min, 3
> tasks in 40 min and 1 task in 2 hours)
>
> Some people from the team worry that the executor which runs the longest
> task can be killed by YARN (because executor might be unresponsive because
> of GC or it might occupy more memory than Yarn allows)
>
>
>
> On Tue, Sep 8, 2015 at 3:02 PM, Sandy Ryza 
> wrote:
>
>> Those settings seem reasonable to me.
>>
>> Are you observing performance that's worse than you would expect?
>>
>> -Sandy
>>
>> On Mon, Sep 7, 2015 at 11:22 AM, Alexander Pivovarov <
>> apivova...@gmail.com> wrote:
>>
>>> Hi Sandy
>>>
>>> Thank you for your reply
>>> Currently we use r3.2xlarge boxes (vCPU: 8, Mem: 61 GiB)
>>> with emr setting for Spark "maximizeResourceAllocation": "true"
>>>
>>> It is automatically converted to Spark settings
>>> spark.executor.memory47924M
>>> spark.yarn.executor.memoryOverhead 5324
>>>
>>> we also set spark.default.parallelism = slave_count * 16
>>>
>>> Does it look good for you? (we run single heavy job on cluster)
>>>
>>> Alex
>>>
>>> On Mon, Sep 7, 2015 at 11:03 AM, Sandy Ryza 
>>> wrote:
>>>
 Hi Alex,

 If they're both configured correctly, there's no reason that Spark
 Standalone should provide performance or memory improvement over Spark on
 YARN.

 -Sandy

 On Fri, Sep 4, 2015 at 1:24 PM, Alexander Pivovarov <
 apivova...@gmail.com> wrote:

> Hi Everyone
>
> We are trying the latest aws emr-4.0.0 and Spark and my question is
> about YARN vs Standalone mode.
> Our usecase is
> - start 100-150 nodes cluster every week,
> - run one heavy spark job (5-6 hours)
> - save data to s3
> - stop cluster
>
> Officially aws emr-4.0.0 comes with Spark on Yarn
> It's probably possible to hack emr by creating bootstrap script which
> stops yarn and starts master and slaves on each computer  (to start Spark
> in standalone mode)
>
> My questions are
> - Does Spark standalone provides significant performance / memory
> improvement in comparison to YARN mode?
> - Does it worth hacking official emr Spark on Yarn and switch Spark to
> Standalone mode?
>
>
> I already created comparison table and want you to check if my
> understanding is correct
>
> Lets say r3.2xlarge computer has 52GB ram available for Spark Executor
> JVMs
>
> standalone to yarn comparison
>
>
>   STDLN   YARN
>
> can executor allocate up to 52GB ram   - yes
>  |  yes
>
> will executor be unresponsive after using all 52GB ram because of GC -
> yes  |  yes
>
> additional JVMs on slave except of spark executor- workr |
> node mngr
>
> are additional JVMs lightweight -
> yes  |  yes
>
>
> Thank you
>
> Alex
>


>>>
>>
>


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-10 Thread Ricardo Luis Silva Paiva
Hi guys,

I tried to use the configuration file, but it didn't work as I expected. As
part of the Spark Streaming flow, my methods run only when the application
is started the first time. Once I restart the app, it reads from the
checkpoint and all the dstream operations come from the cache. No parameter
is reloaded.

I would like to know if it's possible to reset the time of windowed
operations, checkpoint time etc. I also would like to change the submission
parameters, like number of executors, memory per executor or driver etc. If
it's not possible, what kind of parameters do you guys usually use in a
configuration file. I know that the streaming interval it not possible to
be changed.

This is my code:

def main(args: Array[String]): Unit = {
  val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
createSparkContext _)
  ssc.start()
  ssc.awaitTermination()
  ssc.stop()
}

def createSparkContext(): StreamingContext = {
  val sparkConf = new SparkConf()
 .setAppName(APP_NAME)
 .set("spark.streaming.unpersist", "true")
  val ssc = new StreamingContext(sparkConf, streamingInterval)
  ssc.checkpoint(CHECKPOINT_FOLDER)
  ssc.sparkContext.addFile(CONFIG_FILENAME)

  val rawStream = createKafkaRDD(ssc)
  processAndSave(rawStream)
  return ssc
}

def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {

  val configFile = SparkFiles.get("config.properties")
  val config:Config = ConfigFactory.parseFile(new File(configFile))


*  slidingInterval = Minutes(config.getInt("streaming.sliding.interval"))
windowLength = Minutes(config.getInt("streaming.window.interval"))
minPageview = config.getInt("streaming.pageview.min")*


  val pageviewStream = rawStream.map{ case (_, raw) =>
(PageViewParser.parseURL(raw), 1L) }
  val pageviewsHourlyCount =
stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
_,

PageViewAgregator.pageviewsMinus
_,
 *windowLength*,
 *slidingInterval*)

  val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
*minPageview*)
  permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
 .repartition(1)
 .saveAsTextFiles(DESTINATION_FILE, "txt")

}

I really appreciate any help on this.

Many thanks,

Ricardo

On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
ricardo.pa...@corp.globo.com> wrote:

> Good tip. I will try that.
>
> Thank you.
>
> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger  wrote:
>
>> Yeah, in general if you're changing the jar you can't recover the
>> checkpoint.
>>
>> If you're just changing parameters, why not externalize those in a
>> configuration file so your jar doesn't change?  I tend to stick even my
>> app-specific parameters in an external spark config so everything is in one
>> place.
>>
>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>> Hi,
>>>
>>> Is there a way to submit an app code change, keeping the checkpoint data
>>> or do I need to erase the checkpoint folder every time I re-submit the
>>> spark app with a new jar?
>>>
>>> I have an app that count pageviews streaming from Kafka, and deliver a
>>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
>>> the reduce and inverse functions set.
>>>
>>> I'm doing some code improvements and would like to keep the data from
>>> the past hours, so when I re-submit a code change, I would keep delivering
>>> the pageviews aggregation without need to wait for 24 hours of new data.
>>> Sometimes I'm just changing the submission parameters, like number of
>>> executors, memory and cores.
>>>
>>> Many thanks,
>>>
>>> Ricardo
>>>
>>> --
>>> Ricardo Paiva
>>> Big Data / Semântica
>>> *globo.com* 
>>>
>>
>>
>
>
> --
> Ricardo Paiva
> Big Data / Semântica
> *globo.com* 
>



-- 
Ricardo Paiva
Big Data / Semântica
*globo.com* 


Spark based Kafka Producer

2015-09-10 Thread Atul Kulkarni
Hi Folks,

Below is the code  have for Spark based Kafka Producer to take advantage of
multiple executors reading files in parallel on my cluster but I am stuck
at The program not making any progress.

Below is my scrubbed code:

val sparkConf = new SparkConf().setAppName(applicationName)
val ssc = new StreamingContext(sparkConf, Seconds(2))

val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))

val zipFileDStreams = ssc.textFileStream(inputFiles)
zipFileDStreams.foreachRDD {
  rdd =>
rdd.foreachPartition(
  partition => {
partition.foreach{
  case (logLineText) =>
println(logLineText)
producerObj.value.send(topics, logLineText)
}
  }
)
}

ssc.start()
ssc.awaitTermination()

ssc.stop()

The code for KafkaSink is as follows.

class KafkaSink(createProducer: () => KafkaProducer[Array[Byte],
Array[Byte]]) extends Serializable {

  lazy val producer = createProducer()
  val logParser = new LogParser()

  def send(topic: String, value: String): Unit = {

val logLineBytes =
Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
logLineBytes))
  }
}

object KafkaSink {
  def apply(config: Properties): KafkaSink = {

val f = () => {
  val producer = new KafkaProducer[Array[Byte],
Array[Byte]](config, null, null)

  sys.addShutdownHook {
producer.close()
  }
  producer
}

new KafkaSink(f)
  }
}

Disclaimer: it is based on the code inspired by
http://allegro.tech/spark-kafka-integration.html.

The job just sits there I cannot see any Job Stages being created.
Something I want to mention - I I am trying to read gzipped files from HDFS
- could it be that Streaming context is not able to read *.gz files?


I am not sure what more details I can provide to help explain my problem.


-- 
Regards,
Atul Kulkarni


Re: Avoiding SQL Injection in Spark SQL

2015-09-10 Thread Ruslan Dautkhanov
Using dataframe API is a good workaround.

Another way would be to use bind variables. I don't think Spark SQL
supports them.
That's what Dinesh probably meant by "was not able to find any API for
preparing the SQL statement safely avoiding injection".

E.g.

val sql_handler = sqlContext.sql("SELECT name FROM people WHERE age >=
:var1 AND age <= :var2").parse()

toddlers = sql_handler.execute("var1"->1, "var2"->3)

teenagers = sql_handler.execute(13, 19)


It's not possible to do a SQL Injection if Spark SQL would support bind
variables, as parameter would be always treated as variables and not part
of SQL. Also it's arguably easier for developers as you don't have to
escape/quote.


ps. Another advantage is Spark could parse and create plan once - but
execute multiple times.
http://www.akadia.com/services/ora_bind_variables.html
This point is more relevant for OLTP-like queries which Spark is probably
not yet good at (e.g. return a few rows quickly/ winthin a few ms).



-- 
Ruslan Dautkhanov

On Thu, Sep 10, 2015 at 12:07 PM, Michael Armbrust 
wrote:

> Either that or use the DataFrame API, which directly constructs query
> plans and thus doesn't suffer from injection attacks (and runs on the same
> execution engine).
>
> On Thu, Sep 10, 2015 at 12:10 AM, Sean Owen  wrote:
>
>> I don't think this is Spark-specific. Mostly you need to escape /
>> quote user-supplied values as with any SQL engine.
>>
>> On Thu, Sep 10, 2015 at 7:32 AM, V Dineshkumar
>>  wrote:
>> > Hi,
>> >
>> > What is the preferred way of avoiding SQL Injection while using Spark
>> SQL?
>> > In our use case we have to take the parameters directly from the users
>> and
>> > prepare the SQL Statement.I was not able to find any API for preparing
>> the
>> > SQL statement safely avoiding injection.
>> >
>> > Thanks,
>> > Dinesh
>> > Philips India
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Spark UI keeps redirecting to /null and returns 500

2015-09-10 Thread rajeevpra
Hi All,

I am having problem in accessing spark UI while running in spark-client
mode. It works fine in local mode. 

It keeps redirecting back to itself by adding /null at the end and
ultimately run out of size limit for url and returns 500. Look at response
below.

I have a feeling that I might be missing some config, I played with various
config setting for yarn with no success.

I am using spark version 1.3.1

Any help will be greatly appreciated.

--2015-09-09 11:22:17--  http://192.168.13.37:4040/

Connecting to 192.168.13.37:4040... connected.

HTTP request sent, awaiting response... 

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/ [following]

--2015-09-09 11:22:17--  http://192.168.13.37:4040/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response... 

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/null/null/ [following]

--2015-09-09 11:22:17--  http://192.168.13.37:4040/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response... 

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/
[following]

--2015-09-09 11:22:17-- 
http://192.168.13.37:4040/null/null/null/null/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response... 

  HTTP/1.1 302 Found

  Location:
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location:
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/
[following]

--2015-09-09 11:22:17-- 
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/

Reusing existing connection to 192.168.13.37:4040.


Here is stack dump:

15/09/09 11:22:18 WARN server.Response: Committed before 500 null

15/09/09 11:22:18 WARN server.AbstractHttpConnection:

Re: about mr-style merge sort

2015-09-10 Thread Saisai Shao
Hi Qianhao,

I think you could sort the data by yourself if you want achieve the same
result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each
partition).  Do not call sortByKey again since it will introduce another
shuffle (that's the reason why it is slower than MR).

The problem and difficulty is that you have to achieve external sort
yourself, since memory may not be enough to hold the whole partition.

Spark's shuffle is different from MR, which does not have key ordering
restriction. So the scenarios like what you mentioned is not so easy to
address. SPARK-2926 tries to solve the scenario like yours, but it is not
merged yet, you have to find a workaround in application level.

Thanks
Jerry



On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> In mr jobs, the output is sorted only within reducer.. That can be better
> emulated by sorting each partition of rdd rather than total sorting the
> rdd..
> In Rdd.mapPartition you can sort the data in one partition and try...
> On Sep 11, 2015 7:36 AM, "周千昊"  wrote:
>
>> Hi, all
>>  Can anyone give some tips about this issue?
>>
>> 周千昊 于2015年9月8日周二 下午4:46写道:
>>
>>> Hi, community
>>>  I have an application which I try to migrate from MR to Spark.
>>>  It will do some calculations from Hive and output to hfile which
>>> will be bulk load to HBase Table, details as follow:
>>>
>>>  Rdd input = getSourceInputFromHive()
>>>  Rdd> mapSideResult =
>>> input.glom().mapPartitions(/*some calculation*/)
>>>  // PS: the result in each partition has already been sorted
>>> according to the lexicographical order during the calculation
>>>  mapSideResult.reduceByKey(/*some
>>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
>>> Tuple2*/).saveAsNewAPIHadoopFile(/*write
>>> to hfile*/)
>>>
>>>   *Here is the problem, as in MR, in the reducer side, the mapper
>>> output has already been sorted, so that it is a merge sort which makes
>>> writing to hfile is sequential and fast.*
>>> *  However in Spark, the output of reduceByKey phase has been
>>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>>> slower 2x running on Spark than on MR.*
>>> *  I am wondering that, if there is anything I can leverage has the
>>> same effect as MR. I happen to see a JIRA
>>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>>> . Is it related to what I
>>> am looking for?*
>>>
>> --
>> Best Regard
>> ZhouQianhao
>>
>


Re: Spark based Kafka Producer

2015-09-10 Thread Atul Kulkarni
I am submitting the job with yarn-cluster mode.

spark-submit --master yarn-cluster ...

On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> What is the value of spark master conf.. By default it is local, that
> means only one thread can run and that is why your job is stuck.
> Specify it local[*], to make thread pool equal to number of cores...
>
> Raghav
> On Sep 11, 2015 6:06 AM, "Atul Kulkarni"  wrote:
>
>> Hi Folks,
>>
>> Below is the code  have for Spark based Kafka Producer to take advantage
>> of multiple executors reading files in parallel on my cluster but I am
>> stuck at The program not making any progress.
>>
>> Below is my scrubbed code:
>>
>> val sparkConf = new SparkConf().setAppName(applicationName)
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>
>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>
>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>> zipFileDStreams.foreachRDD {
>>   rdd =>
>> rdd.foreachPartition(
>>   partition => {
>> partition.foreach{
>>   case (logLineText) =>
>> println(logLineText)
>> producerObj.value.send(topics, logLineText)
>> }
>>   }
>> )
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> ssc.stop()
>>
>> The code for KafkaSink is as follows.
>>
>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>> Array[Byte]]) extends Serializable {
>>
>>   lazy val producer = createProducer()
>>   val logParser = new LogParser()
>>
>>   def send(topic: String, value: String): Unit = {
>>
>> val logLineBytes = 
>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>> logLineBytes))
>>   }
>> }
>>
>> object KafkaSink {
>>   def apply(config: Properties): KafkaSink = {
>>
>> val f = () => {
>>   val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>> null, null)
>>
>>   sys.addShutdownHook {
>> producer.close()
>>   }
>>   producer
>> }
>>
>> new KafkaSink(f)
>>   }
>> }
>>
>> Disclaimer: it is based on the code inspired by
>> http://allegro.tech/spark-kafka-integration.html.
>>
>> The job just sits there I cannot see any Job Stages being created.
>> Something I want to mention - I I am trying to read gzipped files from HDFS
>> - could it be that Streaming context is not able to read *.gz files?
>>
>>
>> I am not sure what more details I can provide to help explain my problem.
>>
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>


-- 
Regards,
Atul Kulkarni


RE: Maintaining Kafka Direct API Offsets

2015-09-10 Thread Samya
Thanks Ameya.

From: ameya [via Apache Spark User List] 
[mailto:ml-node+s1001560n24650...@n3.nabble.com]
Sent: Friday, September 11, 2015 4:12 AM
To: Samya MAITI 
Subject: Re: Maintaining Kafka Direct API Offsets

So I added something like this:


Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
ssc.stop(true, true);
}
});

And also:

finally {
if (ssc != null) {
ssc.stop(true, true);
}
}



This ensured more graceful shutdown of the streaming app.



On Wed, Sep 9, 2015 at 11:21 PM, Samya [via Apache Spark User List] <[hidden 
email]> wrote:
Hi Ameya,

Plz suggest, when you say graceful shut-down, what exactly did you handle?

Thanks.

Thanks,
Sam

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24636.html
To unsubscribe from Maintaining Kafka Direct API Offsets, click 
here.
NAML



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24650.html
To unsubscribe from Maintaining Kafka Direct API Offsets, click 
here.
NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24652.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re:Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Todd
Thanks Michael for the reply.
Below is the sql plan for 1.5 and 1.4. 1.5 is using SortMergeJoin, while 1.4.1 
is using shuffled hash join.

In this case, it seems hash join performs better than sort join.


===
=1.5.0=
===
== Parsed Logical Plan ==
'Project 
[unresolvedalias('t1.ss_quantity),unresolvedalias('t1.ss_list_price),unresolvedalias('t1.ss_coupon_amt),unresolvedalias('t1.ss_cdemo_sk),unresolvedalias('t1.ss_item_sk),unresolvedalias('t1.ss_promo_sk),unresolvedalias('t1.ss_sold_date_sk)]
 'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179))
  'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk))
   'UnresolvedRelation [store_sales], Some(t1)
   'UnresolvedRelation [store_sales], Some(t2)

== Analyzed Logical Plan ==
ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2), 
ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int
Project 
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
 Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179))
  Join Inner, Some((ss_item_sk#48 = ss_item_sk#71))
   Subquery t1
Subquery store_sales
 
Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
   Subquery t2
Subquery store_sales
 
Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]

== Optimized Logical Plan ==
Project 
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
 Join Inner, Some((ss_item_sk#48 = ss_item_sk#71))
  Project 
[ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48]
   Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179))

Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
  Project [ss_item_sk#71]
   
Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]

== Physical Plan ==
TungstenProject 
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
 SortMergeJoin [ss_item_sk#48], [ss_item_sk#71]
  TungstenSort [ss_item_sk#48 ASC], false, 0
   TungstenExchange hashpartitioning(ss_item_sk#48)
ConvertToUnsafe
 Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48]
  TungstenSort [ss_item_sk#71 ASC], false, 0
   TungstenExchange hashpartitioning(ss_item_sk#71)
ConvertToUnsafe
 Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#71]

Code Generation: true



Re: about mr-style merge sort

2015-09-10 Thread 周千昊
Hi, all
 Can anyone give some tips about this issue?

周千昊 于2015年9月8日周二 下午4:46写道:

> Hi, community
>  I have an application which I try to migrate from MR to Spark.
>  It will do some calculations from Hive and output to hfile which will
> be bulk load to HBase Table, details as follow:
>
>  Rdd input = getSourceInputFromHive()
>  Rdd> mapSideResult =
> input.glom().mapPartitions(/*some calculation*/)
>  // PS: the result in each partition has already been sorted according
> to the lexicographical order during the calculation
>  mapSideResult.reduceByKey(/*some
> aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
> Tuple2*/).saveAsNewAPIHadoopFile(/*write
> to hfile*/)
>
>   *Here is the problem, as in MR, in the reducer side, the mapper
> output has already been sorted, so that it is a merge sort which makes
> writing to hfile is sequential and fast.*
> *  However in Spark, the output of reduceByKey phase has been
> shuffled, so I have to sort the rdd in order to write hfile which makes it
> slower 2x running on Spark than on MR.*
> *  I am wondering that, if there is anything I can leverage has the
> same effect as MR. I happen to see a JIRA
> ticket https://issues.apache.org/jira/browse/SPARK-2926
> . Is it related to what I
> am looking for?*
>
-- 
Best Regard
ZhouQianhao


Re: about mr-style merge sort

2015-09-10 Thread Raghavendra Pandey
In mr jobs, the output is sorted only within reducer.. That can be better
emulated by sorting each partition of rdd rather than total sorting the
rdd..
In Rdd.mapPartition you can sort the data in one partition and try...
On Sep 11, 2015 7:36 AM, "周千昊"  wrote:

> Hi, all
>  Can anyone give some tips about this issue?
>
> 周千昊 于2015年9月8日周二 下午4:46写道:
>
>> Hi, community
>>  I have an application which I try to migrate from MR to Spark.
>>  It will do some calculations from Hive and output to hfile which
>> will be bulk load to HBase Table, details as follow:
>>
>>  Rdd input = getSourceInputFromHive()
>>  Rdd> mapSideResult =
>> input.glom().mapPartitions(/*some calculation*/)
>>  // PS: the result in each partition has already been sorted
>> according to the lexicographical order during the calculation
>>  mapSideResult.reduceByKey(/*some
>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
>> Tuple2*/).saveAsNewAPIHadoopFile(/*write
>> to hfile*/)
>>
>>   *Here is the problem, as in MR, in the reducer side, the mapper
>> output has already been sorted, so that it is a merge sort which makes
>> writing to hfile is sequential and fast.*
>> *  However in Spark, the output of reduceByKey phase has been
>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>> slower 2x running on Spark than on MR.*
>> *  I am wondering that, if there is anything I can leverage has the
>> same effect as MR. I happen to see a JIRA
>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>> . Is it related to what I
>> am looking for?*
>>
> --
> Best Regard
> ZhouQianhao
>


Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Jesse F Chen


  Could this be a build issue (i.e., sbt package)?

  If I ran the same jar build for 1.4.1 in 1.5, I am seeing large
regression too in queries (all other things identical)...

  I am curious, to build 1.5 (when it isn't released yet), what do I need
to do with the build.sbt file?

  any special parameters i should be using to make sure I load the latest
hive dependencies?



From:   Michael Armbrust 
To: Todd 
Cc: "user@spark.apache.org" 
Date:   09/10/2015 11:07 AM
Subject:Re: spark 1.5 SQL slows down dramatically by 50%+ compared with
spark 1.4.1 SQL



I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3,
so this is surprising.  In my experiments Spark 1.5 is either the same or
faster than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd  wrote:
  Hi,

  I am using data generated with sparksqlperf(
  https://github.com/databricks/spark-sql-perf) to test the spark sql
  performance (spark on yarn, with 10 nodes) with the following code (The
  table store_sales is about 90 million records, 6G in size)

  val
  outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"

  val name="store_sales"
      sqlContext.sql(
    s"""
    |CREATE TEMPORARY TABLE ${name}
    |USING org.apache.spark.sql.parquet
    |OPTIONS (
    |  path '${outputDir}'
    |)
      """.stripMargin)

  val sql="""
   |select
   |  t1.ss_quantity,
   |  t1.ss_list_price,
   |  t1.ss_coupon_amt,
   |  t1.ss_cdemo_sk,
   |  t1.ss_item_sk,
   |  t1.ss_promo_sk,
   |  t1.ss_sold_date_sk
   |from store_sales t1 join store_sales t2 on t1.ss_item_sk =
  t2.ss_item_sk
   |where
   |  t1.ss_sold_date_sk between 2450815 and 2451179
     """.stripMargin

  val df = sqlContext.sql(sql)
  df.rdd.foreach(row=>Unit)

  With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes
  with 1.5.

  The configuration are basically the same, since I copy the configuration
  from 1.4.1 to 1.5:

  sparkVersion    1.4.1        1.5.0
  scaleFactor    30        30
  spark.sql.shuffle.partitions    600        600
  spark.sql.sources.partitionDiscovery.enabled    true        true
  spark.default.parallelism    200        200
  spark.driver.memory    4G    4G        4G
  spark.executor.memory    4G        4G
  spark.executor.instances    10        10
  spark.shuffle.consolidateFiles    true        true
  spark.storage.memoryFraction    0.4        0.4
  spark.executor.cores    3        3

  I am not sure where is going wrong,any ideas?




RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Cheng, Hao
This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
fully utilize the sorting yet, more details can be found at 
https://issues.apache.org/jira/browse/SPARK-2926 .

Anyway, can you disable the sort merge join by 
“spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query again? 
In our previous testing, it’s about 20% slower for sort merge join. I am not 
sure if there anything else slow down the performance.

Hao


From: Jesse F Chen [mailto:jfc...@us.ibm.com]
Sent: Friday, September 11, 2015 1:18 PM
To: Michael Armbrust
Cc: Todd; user@spark.apache.org
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL


Could this be a build issue (i.e., sbt package)?

If I ran the same jar build for 1.4.1 in 1.5, I am seeing large regression too 
in queries (all other things identical)...

I am curious, to build 1.5 (when it isn't released yet), what do I need to do 
with the build.sbt file?

any special parameters i should be using to make sure I load the latest hive 
dependencies?

[Inactive hide details for Michael Armbrust ---09/10/2015 11:07:28 AM---I've 
been running TPC-DS SF=1500 daily on Spark 1.4.1 an]Michael Armbrust 
---09/10/2015 11:07:28 AM---I've been running TPC-DS SF=1500 daily on Spark 
1.4.1 and Spark 1.5 on S3, so this is surprising.  I

From: Michael Armbrust >
To: Todd >
Cc: "user@spark.apache.org" 
>
Date: 09/10/2015 11:07 AM
Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 
1.4.1 SQL





I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so 
this is surprising.  In my experiments Spark 1.5 is either the same or faster 
than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever 
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd > 
wrote:
Hi,

I am using data generated with 
sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark 
sql performance (spark on yarn, with 10 nodes) with the following code (The 
table store_sales is about 90 million records, 6G in size)

val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
val name="store_sales"
sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE ${name}
  |USING org.apache.spark.sql.parquet
  |OPTIONS (
  |  path '${outputDir}'
  |)
""".stripMargin)

val sql="""
 |select
 |  t1.ss_quantity,
 |  t1.ss_list_price,
 |  t1.ss_coupon_amt,
 |  t1.ss_cdemo_sk,
 |  t1.ss_item_sk,
 |  t1.ss_promo_sk,
 |  t1.ss_sold_date_sk
 |from store_sales t1 join store_sales t2 on t1.ss_item_sk = 
t2.ss_item_sk
 |where
 |  t1.ss_sold_date_sk between 2450815 and 2451179
   """.stripMargin

val df = sqlContext.sql(sql)
df.rdd.foreach(row=>Unit)

With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes with 
1.5.

The configuration are basically the same, since I copy the configuration from 
1.4.1 to 1.5:

sparkVersion1.4.11.5.0
scaleFactor3030
spark.sql.shuffle.partitions600600
spark.sql.sources.partitionDiscovery.enabledtruetrue
spark.default.parallelism200200
spark.driver.memory4G4G4G
spark.executor.memory4G4G
spark.executor.instances1010
spark.shuffle.consolidateFilestruetrue
spark.storage.memoryFraction0.40.4
spark.executor.cores33

I am not sure where is going wrong,any ideas?



Re: Spark based Kafka Producer

2015-09-10 Thread Raghavendra Pandey
What is the value of spark master conf.. By default it is local, that means
only one thread can run and that is why your job is stuck.
Specify it local[*], to make thread pool equal to number of cores...

Raghav
On Sep 11, 2015 6:06 AM, "Atul Kulkarni"  wrote:

> Hi Folks,
>
> Below is the code  have for Spark based Kafka Producer to take advantage
> of multiple executors reading files in parallel on my cluster but I am
> stuck at The program not making any progress.
>
> Below is my scrubbed code:
>
> val sparkConf = new SparkConf().setAppName(applicationName)
> val ssc = new StreamingContext(sparkConf, Seconds(2))
>
> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>
> val zipFileDStreams = ssc.textFileStream(inputFiles)
> zipFileDStreams.foreachRDD {
>   rdd =>
> rdd.foreachPartition(
>   partition => {
> partition.foreach{
>   case (logLineText) =>
> println(logLineText)
> producerObj.value.send(topics, logLineText)
> }
>   }
> )
> }
>
> ssc.start()
> ssc.awaitTermination()
>
> ssc.stop()
>
> The code for KafkaSink is as follows.
>
> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
> Array[Byte]]) extends Serializable {
>
>   lazy val producer = createProducer()
>   val logParser = new LogParser()
>
>   def send(topic: String, value: String): Unit = {
>
> val logLineBytes = 
> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
> logLineBytes))
>   }
> }
>
> object KafkaSink {
>   def apply(config: Properties): KafkaSink = {
>
> val f = () => {
>   val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
> null, null)
>
>   sys.addShutdownHook {
> producer.close()
>   }
>   producer
> }
>
> new KafkaSink(f)
>   }
> }
>
> Disclaimer: it is based on the code inspired by
> http://allegro.tech/spark-kafka-integration.html.
>
> The job just sits there I cannot see any Job Stages being created.
> Something I want to mention - I I am trying to read gzipped files from HDFS
> - could it be that Streaming context is not able to read *.gz files?
>
>
> I am not sure what more details I can provide to help explain my problem.
>
>
> --
> Regards,
> Atul Kulkarni
>


RE: reading files on HDFS /s3 in sparkR -failing

2015-09-10 Thread Sun, Rui
Hi, Roni,

For parquetFile(), it is just a warning, you can get the DataFrame 
successfully, right? It is a bug has been fixed in the latest repo: 
https://issues.apache.org/jira/browse/SPARK-8952

For S3, it is not related to SparkR. I guess it is related to 
http://stackoverflow.com/questions/28029134/how-can-i-access-s3-s3n-from-a-local-hadoop-2-6-installation
 , https://issues.apache.org/jira/browse/SPARK-7442


From: roni [mailto:roni.epi...@gmail.com]
Sent: Friday, September 11, 2015 3:05 AM
To: user@spark.apache.org
Subject: reading files on HDFS /s3 in sparkR -failing


I am trying this -
 ddf <- parquetFile(sqlContext,  
"hdfs://ec2-52-26-180-130.us-west-2.compute.amazonaws.com:9000/IPF_14_1.parquet")
and I get 
path[1]="hdfs://ec2-52-26-180-130.us-west-2.compute.amazonaws.com:9000/IPF_14_1.parquet":
 No such file or directory

when I read file on s3 , I get -  java.io.IOException: No FileSystem for 
scheme: s3

Thanks in advance.
-Roni






Re: about mr-style merge sort

2015-09-10 Thread 周千昊
Hi, Shao & Pendey
  Thanks for tips. I will try to workaround this.

Saisai Shao 于2015年9月11日周五 下午1:23写道:

> Hi Qianhao,
>
> I think you could sort the data by yourself if you want achieve the same
> result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each
> partition).  Do not call sortByKey again since it will introduce another
> shuffle (that's the reason why it is slower than MR).
>
> The problem and difficulty is that you have to achieve external sort
> yourself, since memory may not be enough to hold the whole partition.
>
> Spark's shuffle is different from MR, which does not have key ordering
> restriction. So the scenarios like what you mentioned is not so easy to
> address. SPARK-2926 tries to solve the scenario like yours, but it is not
> merged yet, you have to find a workaround in application level.
>
> Thanks
> Jerry
>
>
>
> On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> In mr jobs, the output is sorted only within reducer.. That can be better
>> emulated by sorting each partition of rdd rather than total sorting the
>> rdd..
>> In Rdd.mapPartition you can sort the data in one partition and try...
>> On Sep 11, 2015 7:36 AM, "周千昊"  wrote:
>>
>>> Hi, all
>>>  Can anyone give some tips about this issue?
>>>
>>> 周千昊 于2015年9月8日周二 下午4:46写道:
>>>
 Hi, community
  I have an application which I try to migrate from MR to Spark.
  It will do some calculations from Hive and output to hfile which
 will be bulk load to HBase Table, details as follow:

  Rdd input = getSourceInputFromHive()
  Rdd> mapSideResult =
 input.glom().mapPartitions(/*some calculation*/)
  // PS: the result in each partition has already been sorted
 according to the lexicographical order during the calculation
  mapSideResult.reduceByKey(/*some
 aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
 Tuple2*/).saveAsNewAPIHadoopFile(/*write
 to hfile*/)

   *Here is the problem, as in MR, in the reducer side, the mapper
 output has already been sorted, so that it is a merge sort which makes
 writing to hfile is sequential and fast.*
 *  However in Spark, the output of reduceByKey phase has been
 shuffled, so I have to sort the rdd in order to write hfile which makes it
 slower 2x running on Spark than on MR.*
 *  I am wondering that, if there is anything I can leverage has the
 same effect as MR. I happen to see a JIRA
 ticket https://issues.apache.org/jira/browse/SPARK-2926
 . Is it related to what I
 am looking for?*

>>> --
>>> Best Regard
>>> ZhouQianhao
>>>
>>
> --
Best Regard
ZhouQianhao


How to create combine DAG visualization?

2015-09-10 Thread b.bhavesh
Hi,

How can I create combine DAG visualization of pyspark code instead of
separate DAGs of jobs and stages?

Thanks 
b.bhavesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-combine-DAG-visualization-tp24653.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to keep history of streaming statistics

2015-09-10 Thread b.bhavesh
Hi Himanshu Mehra,

Thanks for reply. I am running spark standalone cluster. I have already set
the property regarding logging events in history server as you mentioned. I
have also started the history server. 

I am running my code with awaitTermination(). So it never going to completed
jobs. However, I can see it under incomplete jobs category in history server
UI.

The problem is History server is not showing the "streaming" tab (streaming
statistics like Input rate, Scheduling Delay, Processing Time etc.), which
is available in Spark UI while job is running.

For this do I need to configure something? Where these statistics related
files are stored, when job is running?

Thanks,
b.bhavesh 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-history-of-streaming-statistics-tp24635p24651.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sprk RDD : want to combine elements that have approx same keys

2015-09-10 Thread prateek arora
Hi

In my scenario :
I have rdd with key/value pair . i want to combine elements that have approx
same keys.
like
(144,value)(143,value)(142,value)...(214,value)(213,value)(212,value)(313,value)(314,value)...

i want to combine elements that have key 144.143,142... means keys have
+-2 range
same with 214,213,212 keys and so on.

how can i do this

regards
prateek

 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sprk-RDD-want-to-combine-elements-that-have-approx-same-keys-tp24644.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming stop gracefully doesn't return to command line after upgrade to 1.4.0 and beyond

2015-09-10 Thread Tathagata Das
Spark 1.4.0 introduced built-in shutdown hooks that would shutdown
StreamingContext and SparkContext (similar to yours). If you are also
introducing your shutdown hook, I wonder whats the behavior going to be.

Try doing a jstack to see where the system is stuck. Alternatively, remove
your shutdown hook and see what happens.

On Thu, Sep 10, 2015 at 3:11 AM, Petr Novak  wrote:

> Hello,
> my Spark streaming v1.3.0 code uses
>
> sys.ShutdownHookThread {
>   ssc.stop(stopSparkContext = true, stopGracefully = true)
> }
>
> to use Ctrl+C in command line to stop it. It returned back to command line
> after it finished batch but it doesn't with v1.4.0-v.1.5.0. Was the
> behaviour or required code changed?
>
> The last messages are:
>
> [2015-09-08 13:02:43,300] INFO Waited for jobs to be processed and
> checkpoints to be written
> (org.apache.spark.streaming.scheduler.JobGenerator)
> [2015-09-08 13:02:43,300] INFO CheckpointWriter executor terminated ?
> true, waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
> [2015-09-08 13:02:43,301] INFO Stopped JobGenerator
> (org.apache.spark.streaming.scheduler.JobGenerator)
> [2015-09-08 13:02:43,302] INFO Stopped JobScheduler
> (org.apache.spark.streaming.scheduler.JobScheduler)
> [2015-09-08 13:02:43,303] INFO stopped
> o.s.j.s.ServletContextHandler{/streaming,null}
> (org.spark-project.jetty.server.handler.ContextHandler)
> [2015-09-08 13:02:43,305] INFO stopped
> o.s.j.s.ServletContextHandler{/streaming/batch,null}
> (org.spark-project.jetty.server.handler.ContextHandler)
> [2015-09-08 13:02:43,307] INFO stopped
> o.s.j.s.ServletContextHandler{/static/streaming,null}
> (org.spark-project.jetty.server.handler.ContextHandler)
>
> Thank you for any explanation,
> Petr
>


java.lang.NullPointerException with Twitter API

2015-09-10 Thread Jo Sunad
Hello!

I am trying to customize the Twitter Example TD did by only printing
messages that have a GeoLocation.

I am getting a NullPointerException:

java.lang.NullPointerException
at Twitter$$anonfun$1.apply(Twitter.scala:64)
at Twitter$$anonfun$1.apply(Twitter.scala:64)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at
com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)


Twitter.scala:64 is this line of code:
//every GeoLocation should contain a ","

val filtering = stream.filter(status =>
status.getGeoLocation().toString().contains(","))

Relevant code:

//every GeoLocation should contain a ","
 val filtering = stream.filter(status =>
status.getGeoLocation().toString().contains(","))

//this code works if I do stream.map
val hashTags = filtering.map(status =>
TweetC(classifyTweet(status.getText()), status.getGeoLocation(),
status.getUser().getFollowersCount(),status.getText())).saveToCassandra("demo",
"twitter")

 I'm thinking this might be due to the free public Twitter API not letting
me get access to GeoTagged tweets so val hashtags is always null and hence
the NullPointerException. Has anyone else used the free API and seen
GeoLocations?


Re: Re: Failed when starting Spark 1.5.0 standalone cluster

2015-09-10 Thread Adam Hunt
You can add it to conf/spark-env.sh.

$ cat conf/spark-env.sh
#!/usr/bin/env bash
JAVA_HOME=/app/tools/jdk1.7
PATH=$JAVA_HOME/bin:$PATH
MESOS_NATIVE_JAVA_LIBRARY="/usr/lib/libmesos.so"
SPARK_CLASSPATH="/opt/mapr/hadoop/hadoop-0.20.2/lib/amazon-s3.jar"




On Wed, Sep 9, 2015 at 10:25 PM, Netwaver  wrote:

> Thank you, Ted, This does help.
> One more question, If I just want to migrate JDK only for Spark on my
> cluster machines, where can I add the JAVA_HOME environment variable? Does
> conf/spark-env.sh support JAVA_HOME environment variable? Thanks a lot.
>
>
>
>
>
> 在 2015-09-10 12:45:43,"Ted Yu"  写道:
>
> See the following announcement:
>
> http://search-hadoop.com/m/q3RTtojAyW1dabFk
>
> On Wed, Sep 9, 2015 at 9:05 PM, Netwaver  wrote:
>
>> Hi Spark experts,
>>  I am trying to migrate my Spark cluster from
>> 1.4.1 to latest 1.5.0 , but meet below issues when run start-all.sh script.
>>
>>   *Exception in thread "main"
>> java.lang.NoClassDefFoundError: org/apache/spark/launcher/Main*
>> *Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.launcher.Main*
>> *at java.net.URLClassLoader$1.run(Unknown Source)*
>> *at java.security.AccessController.doPrivileged(Native Method)*
>> *at java.net.URLClassLoader.findClass(Unknown Source)*
>> *at java.lang.ClassLoader.loadClass(Unknown Source)*
>> *at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)*
>> *at java.lang.ClassLoader.loadClass(Unknown Source)*
>> *Could not find the main class: org.apache.spark.launcher.Main.  Program
>> will exit.*
>>
>> I could easily migrate Spark cluster from 1.3.1
>> to 1.4.1 on the same machines before, I am wondering if Spark 1.5.0 asks
>> for some special jars in the classpath?
>>I am using JDK 1.6 , don't know if 1.6 is also supported by Spark
>> 1.5.0. Any suggestion will be highly appreciated, thank you all.
>>
>>
>>
>>
>
>
>
>
>


Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-10 Thread Ted Yu
Created https://github.com/apache/spark/pull/8703 to make exception message
more helpful.

On Thu, Sep 10, 2015 at 1:24 PM, Ashish Shenoy 
wrote:

> Yup thanks Ted. My getPartition() method had a bug where a signed int was
> being moduloed with the number of partitions. Fixed that.
>
> Thanks,
> Ashish
>
> On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu  wrote:
>
>> Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException
>> was thrown:
>>
>> while (iterator.hasNext) {
>>   val partitionId = iterator.nextPartition()
>>   iterator.writeNext(partitionWriters(partitionId))
>> }
>> Meaning, partitionId was negative.
>> Execute the following and examine the value of i:
>>
>> int i = -78 % 40;
>>
>> You will see how your getPartition() method should be refined to prevent
>> this exception.
>>
>> On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy 
>> wrote:
>>
>>> I am using spark-1.4.1
>>>
>>> Here's the skeleton code:
>>>
>>> JavaPairRDD rddPair =
>>>   rdd.repartitionAndSortWithinPartitions(
>>>   new CustomPartitioner(), new ExportObjectComparator())
>>> .persist(StorageLevel.MEMORY_AND_DISK_SER());
>>>
>>> ...
>>>
>>> @SuppressWarnings("serial")
>>> private static class CustomPartitioner extends Partitioner {
>>>   int numPartitions;
>>>   @Override
>>>   public int numPartitions() {
>>> numPartitions = 40;
>>> return numPartitions;
>>>   }
>>>
>>>   @Override
>>>   public int getPartition(Object o) {
>>> NewKey newKey = (NewKey) o;
>>> return (int) newKey.getGsMinusURL() % numPartitions;
>>>   }
>>> }
>>>
>>> ...
>>>
>>> @SuppressWarnings("serial")
>>> private static class ExportObjectComparator
>>>   implements Serializable, Comparator {
>>>   @Override
>>>   public int compare(NewKey o1, NewKey o2) {
>>> if (o1.hits == o2.hits) {
>>>   return 0;
>>> } else if (o1.hits > o2.hits) {
>>>   return -1;
>>> } else {
>>>   return 1;
>>> }
>>>   }
>>>
>>> }
>>>
>>> ...
>>>
>>>
>>>
>>> Thanks,
>>> Ashish
>>>
>>> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu  wrote:
>>>
 Which release of Spark are you using ?

 Can you show skeleton of your partitioner and comparator ?

 Thanks



 On Sep 9, 2015, at 4:45 PM, Ashish Shenoy 
 wrote:

 Hi,

 I am trying to sort a RDD pair
 using repartitionAndSortWithinPartitions() for my key [which is a custom
 class, not a java primitive] using a custom partitioner on that key and a
 custom comparator. However, it fails consistently:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in
 stage 1.0 (TID 202, 172.16.18.25):
 java.lang.ArrayIndexOutOfBoundsException: -78
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
 at
 

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-10 Thread Tathagata Das
The metadata checkpointing interval does not really affect any performance,
so I didnt expose any way to control that interval. The data checkpointing
interval actually affects performance, hence the interval is configurable.



On Thu, Sep 10, 2015 at 5:45 AM, Dmitry Goldenberg  wrote:

> >> The whole point of checkpointing is to recover the *exact* computation
> where it left off.
>
> That makes sense. We were looking at the metadata checkpointing and the
> data checkpointing, and with data checkpointing, you can specify a
> checkpoint duration value. With the metadata checkpointing, there doesn't
> seem to be a way, which may be the intent but it wasn't clear why there's a
> way to override one duration (for data) but not the other (for metadata).
>
> The basic feel was that we'd want to minimize the number of times Spark
> Streaming is doing the checkpointing I/O. In other words, some sort of
> sweet spot value where we do checkpointing frequently enough without
> performing I/O too frequently. Finding that sweet spot would mean
> experimenting with the checkpoint duration millis but that parameter
> doesn't appear to be exposed in case of metadata checkpointing.
>
>
>
> On Wed, Sep 9, 2015 at 10:39 PM, Tathagata Das 
> wrote:
>
>> The whole point of checkpointing is to recover the *exact* computation
>> where it left of.
>> If you want any change in the specification of the computation (which
>> includes any intervals), then you cannot recover from checkpoint as it can
>> be an arbitrarily complex issue to deal with changes in the specs,
>> especially because a lot of specs are tied to each other (e.g. checkpoint
>> interval dictates other things like clean up intervals, etc.)
>>
>> Why do you need to change the checkpointing interval at the time of
>> recovery? Trying to understand your usecase.
>>
>>
>> On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> >> when you use getOrCreate, and there exists a valid checkpoint, it
>>> will always return the context from the checkpoint and not call the
>>> factory. Simple way to see whats going on is to print something in the
>>> factory to verify whether it is ever called.
>>>
>>> This is probably OK. Seems to explain why we were getting a sticky batch
>>> duration millis value. Once I blew away all the checkpointing directories
>>> and unplugged the data checkpointing (while keeping the metadata
>>> checkpointing) the batch duration millis was no longer sticky.
>>>
>>> So, there doesn't seem to be a way for metadata checkpointing to
>>> override its checkpoint duration millis, is there?  Is the default there
>>> max(batchdurationmillis, 10seconds)?  Is there a way to override this?
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das 
>>> wrote:
>>>


 See inline.

 On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> What's wrong with creating a checkpointed context??  We WANT
> checkpointing, first of all.  We therefore WANT the checkpointed context.
>
> Second of all, it's not true that we're loading the checkpointed
> context independent of whether params.isCheckpointed() is true.  I'm
> quoting the code again:
>
> // This is NOT loading a checkpointed context if isCheckpointed() is
> false.
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);
>
>   private JavaStreamingContext createCheckpointedContext(SparkConf
> sparkConf, Parameters params) {
> JavaStreamingContextFactory factory = new
> JavaStreamingContextFactory() {
>   @Override
>   public JavaStreamingContext create() {
> return createContext(sparkConf, params);
>   }
> };
> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
> factory);*
>
 ^   when you use getOrCreate, and there exists a valid checkpoint,
 it will always return the context from the checkpoint and not call the
 factory. Simple way to see whats going on is to print something in the
 factory to verify whether it is ever called.





>   }
>
>   private JavaStreamingContext createContext(SparkConf sparkConf,
> Parameters params) {
> // Create context with the specified batch interval, in
> milliseconds.
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(params.getBatchDurationMillis()));
> // Set the checkpoint directory, if we're checkpointing
> if (params.isCheckpointed()) {
>   jssc.checkpoint(params.getCheckpointDir());
>
> }
> ...
> Again, this is *only* calling context.checkpoint() if isCheckpointed()
> is true.  

Re: How to restrict java unit tests from the maven command line

2015-09-10 Thread Sean Owen
-Dtest=none ?

https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-RunningIndividualTests

On Thu, Sep 10, 2015 at 10:39 PM, Stephen Boesch  wrote:
>
> I have invoked mvn test with the -DwildcardSuites option to specify a single
> BinarizerSuite scalatest suite.
>
> The command line is
>
>mvn  -pl mllib  -Pyarn -Phadoop-2.6 -Dhadoop2.7.1 -Dscala-2.11
> -Dmaven.javadoc.skip=true
> -DwildcardSuites=org.apache.spark.ml.feature.BinarizerSuite test
>
> The scala side of affairs is correct: here is the relevant output
>
>
> Results :
>
> Discovery starting.
> Discovery completed in 2 seconds, 614 milliseconds.
> Run starting. Expected test count is: 3
> BinarizerSuite:
> - params
> - Binarize continuous features with default parameter
> - Binarize continuous features with setter
> Run completed in 6 seconds, 311 milliseconds.
> Total number of tests run: 3
>
>
> So we see only the one scala test suite was run- as intended.
>
> But on the java side it seems all of the tests within the mllib project were
> run. Here is a snippet:
>
>
>
> Running org.apache.spark.ml.attribute.JavaAttributeGroupSuite
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.156 sec -
> in org.apache.spark.ml.attribute.JavaAttributeGroupSuite
> Running org.apache.spark.ml.attribute.JavaAttributeSuite
> Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.018 sec -
> in org.apache.spark.ml.attribute.JavaAttributeSuite
> Running org.apache.spark.ml.classification.JavaDecisionTreeClassifierSuite
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.79 sec -
> in
>
> .. dozens of similar ..
>
> Running org.apache.spark.mllib.tree.JavaDecisionTreeSuite
> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.287 sec -
> in org.apache.spark.mllib.tree.JavaDecisionTreeSuite
> -- org.jblas INFO Deleting
> /sparks/sparkup/mllib/target/tmp/jblas6038907640270970048/libjblas.dylib
> -- org.jblas INFO Deleting
> /sparks/sparkup/mllib/target/tmp/jblas6038907640270970048/libjblas_arch_flavor.dylib
> -- org.jblas INFO Deleting
> /sparks/sparkup/mllib/target/tmp/jblas6038907640270970048
>
> Results :
>
> Tests run: 106, Failures: 0, Errors: 0, Skipped: 0
>
> So what is the mvn option / setting to disable the java tests?
>
>
> ...

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to restrict java unit tests from the maven command line

2015-09-10 Thread Stephen Boesch
Yes, adding that flag does the trick. thanks.

2015-09-10 13:47 GMT-07:00 Sean Owen :

> -Dtest=none ?
>
>
> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-RunningIndividualTests
>
> On Thu, Sep 10, 2015 at 10:39 PM, Stephen Boesch 
> wrote:
> >
> > I have invoked mvn test with the -DwildcardSuites option to specify a
> single
> > BinarizerSuite scalatest suite.
> >
> > The command line is
> >
> >mvn  -pl mllib  -Pyarn -Phadoop-2.6 -Dhadoop2.7.1 -Dscala-2.11
> > -Dmaven.javadoc.skip=true
> > -DwildcardSuites=org.apache.spark.ml.feature.BinarizerSuite test
> >
> > The scala side of affairs is correct: here is the relevant output
> >
> >
> > Results :
> >
> > Discovery starting.
> > Discovery completed in 2 seconds, 614 milliseconds.
> > Run starting. Expected test count is: 3
> > BinarizerSuite:
> > - params
> > - Binarize continuous features with default parameter
> > - Binarize continuous features with setter
> > Run completed in 6 seconds, 311 milliseconds.
> > Total number of tests run: 3
> >
> >
> > So we see only the one scala test suite was run- as intended.
> >
> > But on the java side it seems all of the tests within the mllib project
> were
> > run. Here is a snippet:
> >
> >
> >
> > Running org.apache.spark.ml.attribute.JavaAttributeGroupSuite
> > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.156
> sec -
> > in org.apache.spark.ml.attribute.JavaAttributeGroupSuite
> > Running org.apache.spark.ml.attribute.JavaAttributeSuite
> > Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.018
> sec -
> > in org.apache.spark.ml.attribute.JavaAttributeSuite
> > Running
> org.apache.spark.ml.classification.JavaDecisionTreeClassifierSuite
> > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.79 sec
> -
> > in
> >
> > .. dozens of similar ..
> >
> > Running org.apache.spark.mllib.tree.JavaDecisionTreeSuite
> > Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.287
> sec -
> > in org.apache.spark.mllib.tree.JavaDecisionTreeSuite
> > -- org.jblas INFO Deleting
> > /sparks/sparkup/mllib/target/tmp/jblas6038907640270970048/libjblas.dylib
> > -- org.jblas INFO Deleting
> >
> /sparks/sparkup/mllib/target/tmp/jblas6038907640270970048/libjblas_arch_flavor.dylib
> > -- org.jblas INFO Deleting
> > /sparks/sparkup/mllib/target/tmp/jblas6038907640270970048
> >
> > Results :
> >
> > Tests run: 106, Failures: 0, Errors: 0, Skipped: 0
> >
> > So what is the mvn option / setting to disable the java tests?
> >
> >
> > ...
>


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

2015-09-10 Thread swetha
Hi,

How is the ContextCleaner different from spark.cleaner.ttl?Is
spark.cleaner.ttl when there is ContextCleaner in the Streaming job?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-variable-get-cleaned-by-ContextCleaner-unexpectedly-tp10347p24646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-10 Thread Ashish Shenoy
Yup thanks Ted. My getPartition() method had a bug where a signed int was
being moduloed with the number of partitions. Fixed that.

Thanks,
Ashish

On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu  wrote:

> Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException
> was thrown:
>
> while (iterator.hasNext) {
>   val partitionId = iterator.nextPartition()
>   iterator.writeNext(partitionWriters(partitionId))
> }
> Meaning, partitionId was negative.
> Execute the following and examine the value of i:
>
> int i = -78 % 40;
>
> You will see how your getPartition() method should be refined to prevent
> this exception.
>
> On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy 
> wrote:
>
>> I am using spark-1.4.1
>>
>> Here's the skeleton code:
>>
>> JavaPairRDD rddPair =
>>   rdd.repartitionAndSortWithinPartitions(
>>   new CustomPartitioner(), new ExportObjectComparator())
>> .persist(StorageLevel.MEMORY_AND_DISK_SER());
>>
>> ...
>>
>> @SuppressWarnings("serial")
>> private static class CustomPartitioner extends Partitioner {
>>   int numPartitions;
>>   @Override
>>   public int numPartitions() {
>> numPartitions = 40;
>> return numPartitions;
>>   }
>>
>>   @Override
>>   public int getPartition(Object o) {
>> NewKey newKey = (NewKey) o;
>> return (int) newKey.getGsMinusURL() % numPartitions;
>>   }
>> }
>>
>> ...
>>
>> @SuppressWarnings("serial")
>> private static class ExportObjectComparator
>>   implements Serializable, Comparator {
>>   @Override
>>   public int compare(NewKey o1, NewKey o2) {
>> if (o1.hits == o2.hits) {
>>   return 0;
>> } else if (o1.hits > o2.hits) {
>>   return -1;
>> } else {
>>   return 1;
>> }
>>   }
>>
>> }
>>
>> ...
>>
>>
>>
>> Thanks,
>> Ashish
>>
>> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu  wrote:
>>
>>> Which release of Spark are you using ?
>>>
>>> Can you show skeleton of your partitioner and comparator ?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am trying to sort a RDD pair
>>> using repartitionAndSortWithinPartitions() for my key [which is a custom
>>> class, not a java primitive] using a custom partitioner on that key and a
>>> custom comparator. However, it fails consistently:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in
>>> stage 1.0 (TID 202, 172.16.18.25):
>>> java.lang.ArrayIndexOutOfBoundsException: -78
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
>>> at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
>>> at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Driver stacktrace:
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>>> at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>
>>> I also persist the RDD using the "memory 

How to restrict java unit tests from the maven command line

2015-09-10 Thread Stephen Boesch
I have invoked mvn test with the -DwildcardSuites option to specify a
single BinarizerSuite scalatest suite.

The command line is

   mvn  -pl mllib  -Pyarn -Phadoop-2.6 -Dhadoop2.7.1 -Dscala-2.11
-Dmaven.javadoc.skip=true
-DwildcardSuites=org.apache.spark.ml.feature.BinarizerSuite test

The scala side of affairs is correct: here is the relevant output


Results :

Discovery starting.
Discovery completed in 2 seconds, 614 milliseconds.
Run starting. Expected test count is: 3
BinarizerSuite:
- params
- Binarize continuous features with default parameter
- Binarize continuous features with setter
Run completed in 6 seconds, 311 milliseconds.
Total number of tests run: 3


So we see only the one scala test suite was run- as intended.

But on the java side it seems all of the tests within the mllib project
were run. Here is a snippet:



Running org.apache.spark.ml.attribute.JavaAttributeGroupSuite
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.156 sec -
in org.apache.spark.ml.attribute.JavaAttributeGroupSuite
Running org.apache.spark.ml.attribute.JavaAttributeSuite
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.018 sec -
in org.apache.spark.ml.attribute.JavaAttributeSuite
Running org.apache.spark.ml.classification.JavaDecisionTreeClassifierSuite
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.79 sec -
in

.. dozens of similar ..

Running org.apache.spark.mllib.tree.JavaDecisionTreeSuite
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.287 sec -
in org.apache.spark.mllib.tree.JavaDecisionTreeSuite
-- org.jblas INFO Deleting
/sparks/sparkup/mllib/target/tmp/jblas6038907640270970048/libjblas.dylib
-- org.jblas INFO Deleting
/sparks/sparkup/mllib/target/tmp/jblas6038907640270970048/libjblas_arch_flavor.dylib
-- org.jblas INFO Deleting
/sparks/sparkup/mllib/target/tmp/jblas6038907640270970048

Results :

Tests run: 106, Failures: 0, Errors: 0, Skipped: 0

So what is the mvn option / setting to disable the java tests?


...


Re: pyspark driver in cluster rather than gateway/client

2015-09-10 Thread Davies Liu
The YARN cluster mode for PySpark is supported since Spark 1.4:
https://issues.apache.org/jira/browse/SPARK-5162?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22python%20cluster%22

On Thu, Sep 10, 2015 at 6:54 AM, roy  wrote:
> Hi,
>
>   Is there any way to make spark driver to run in side YARN containers
> rather than gateway/client machine.
>
>   At present even with config parameters --master yarn & --deploy-mode
> cluster driver runs on gateway/client machine.
>
> We are on CDH 5.4.1 with YARN and Spark 1.3
>
> any help on this ?
>
> Thanks
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-driver-in-cluster-rather-than-gateway-client-tp24641.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sprk RDD : want to combine elements that have approx same keys

2015-09-10 Thread sethah
If you want each key to be combined only once, you can just create a mapping
of keys to a reduced key space. Something like this

val data = sc.parallelize(Array((0,0.030513227), (1,0.11088216),
(2,0.69165534), (3,0.78524816), (4,0.8516909), (5,0.37751913),
(6,0.05674714), (7,0.27523404), (8,0.40828508), (9,0.9491552)))

data.map { case(k,v) => (k / 3, v)}.reduceByKey(_+_)

That code will group keys that are within two of each other and then sum
each group. Could you clarify, if you have the following keys: [141, 142,
143, 144, 145], do you want groups like [(141, 142, 143), (144, 145)] or do
you need groups [(141, 142, 143), (142, 143, 144), (143, 144, 145), (144,
145)]




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sprk-RDD-want-to-combine-elements-that-have-approx-same-keys-tp24644p24647.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kr

2015-09-10 Thread Huy Banh
Ọqo


Re: How to keep history of streaming statistics

2015-09-10 Thread Tathagata Das
Currently there is no built-in support for persisting streaming statistics
for applications that are over.
However, you can access all the same metrics through the StreamingListener

interface.
You can create your own StreamingListener object with overridden methods,
and the system will call the methods with the metric data. Then you can
yourself save the metric data to some storage yourself.

class MyListener extends StreamingListener {
   override def batchStarted(...) {   // save the metric data to some
storage   }
   ...

}

ssc.addStreamingListener(new MyListener())

TD


On Wed, Sep 9, 2015 at 10:51 PM, b.bhavesh  wrote:

> Hello,
>
> How can I keep history of streaming statistics for completed applications.
> Where in Spark, the information presented on UI is stored?
>
> Thanks,
> b.bhavesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-history-of-streaming-statistics-tp24635.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Maintaining Kafka Direct API Offsets

2015-09-10 Thread Samya
Hi Ameya,

Plz suggest, when you say graceful shut-down, what exactly did you handle?

Thanks.

Thanks,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Avoiding SQL Injection in Spark SQL

2015-09-10 Thread V Dineshkumar
Hi,

What is the preferred way of avoiding SQL Injection while using Spark SQL?
In our use case we have to take the parameters directly from the users and
prepare the SQL Statement.I was not able to find any API for preparing the
SQL statement safely avoiding injection.

Thanks,
Dinesh
Philips India