Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-02 Thread Nicolae Marasoiu
Hi,


Set 10ms and spark.streaming.backpressure.enabled=true


This should automatically delay the next batch until the current one is 
processed, or at least create that balance over a few batches/periods between 
the consume/process rate vs ingestion rate.


Nicu


From: Cody Koeninger 
Sent: Thursday, October 1, 2015 11:46 PM
To: Sourabh Chandak
Cc: user
Subject: Re: spark.streaming.kafka.maxRatePerPartition for direct stream

That depends on your job, your cluster resources, the number of seconds per 
batch...

You'll need to do some empirical work to figure out how many messages per batch 
a given executor can handle.  Divide that by the number of seconds per batch.



On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak 
> wrote:
Hi,

I am writing a spark streaming job using the direct stream method for kafka and 
wanted to handle the case of checkpoint failure when we'll have to reprocess 
the entire data from starting. By default for every new checkpoint it tries to 
load everything from each partition and that takes a lot of time for 
processing. After some searching found out that there exists a config 
spark.streaming.kafka.maxRatePerPartition which can be used to tackle this. My 
question is what will be a suitable range for this config if we have ~12 
million messages in kafka with maximum message size ~10 MB.

Thanks,
Sourabh



Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-02 Thread Sourabh Chandak
Thanks Cody, will try to do some estimation.

Thanks Nicolae, will try out this config.

Thanks,
Sourabh

On Thu, Oct 1, 2015 at 11:01 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> Set 10ms and spark.streaming.backpressure.enabled=true
>
>
> This should automatically delay the next batch until the current one is
> processed, or at least create that balance over a few batches/periods
> between the consume/process rate vs ingestion rate.
>
>
> Nicu
>
> --
> *From:* Cody Koeninger 
> *Sent:* Thursday, October 1, 2015 11:46 PM
> *To:* Sourabh Chandak
> *Cc:* user
> *Subject:* Re: spark.streaming.kafka.maxRatePerPartition for direct stream
>
> That depends on your job, your cluster resources, the number of seconds
> per batch...
>
> You'll need to do some empirical work to figure out how many messages per
> batch a given executor can handle.  Divide that by the number of seconds
> per batch.
>
>
>
> On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak 
> wrote:
>
>> Hi,
>>
>> I am writing a spark streaming job using the direct stream method for
>> kafka and wanted to handle the case of checkpoint failure when we'll have
>> to reprocess the entire data from starting. By default for every new
>> checkpoint it tries to load everything from each partition and that takes a
>> lot of time for processing. After some searching found out that there
>> exists a config spark.streaming.kafka.maxRatePerPartition which can be used
>> to tackle this. My question is what will be a suitable range for this
>> config if we have ~12 million messages in kafka with maximum message size
>> ~10 MB.
>>
>> Thanks,
>> Sourabh
>>
>
>


Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
Hi,

I have a receiverless kafka streaming job which was started yesterday
evening and was running fine till 4 PM today. Suddenly post that writing of
checkpoint has slowed down and it is now not able to catch up with the
incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
checkpointing. Spark streaming is done using a backported code.

Running nodetool shows that the Read latency of the cfs keyspace is ~8.5 ms.

Can someone please help me resolve this?

Thanks,
Sourabh


Re: How to save DataFrame as a Table in Hbase?

2015-10-02 Thread Nicolae Marasoiu
Hi,

Phoenix, an SQL coprocessor for HBase has ingestion integration with dataframes 
in 4.x version.
For HBase and RDD in general there are multiple solutions: hbase-spark module 
by Cloudera, which wil be part of a future HBase release, hbase-rdd by 
unicredit, and many others.
I am not sure if the fact that an RDD is dataframe or normal RDD is relevant 
for storage. I think main advantage of dataframe is economical memory usage and 
efficient scans thru the data in memory and processing in general but when 
mapping to outside schema, you have the same data to map to a schema specific 
to external db. For instance saving granular values in separate columns or 
bundling them together in arrays of concatenated values is a choice that seems 
to be independent of how the rdd is on the spark side - normal rdd or data 
frames, but more like a storage tradeoff between space & speed for various use 
cases (data access patterns).

Nicu

From: unk1102 
Sent: Friday, October 2, 2015 1:15 AM
To: user@spark.apache.org
Subject: How to save DataFrame as a Table in Hbase?

Hi anybody tried to save DataFrame in HBase? I have processed data in
DataFrame which I need to store in HBase so that my web ui can access it
from Hbase? Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-DataFrame-as-a-Table-in-Hbase-tp24903.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calling persist would cause java.util.NoSuchElementException: key not found:

2015-10-02 Thread Shixiong Zhu
Do you have the full stack trace? Could you check if it's same as
https://issues.apache.org/jira/browse/SPARK-10422

Best Regards,
Shixiong Zhu

2015-10-01 17:05 GMT+08:00 Eyad Sibai :

> Hi
>
> I am trying to call .persist() on a dataframe but once I execute the next
> line I am getting
> java.util.NoSuchElementException: key not found: ….
>
> I tried to do persist on disk also the same thing.
>
> I am using:
> pyspark with python3
> spark 1.5
>
>
> Thanks!
>
>
> EYAD SIBAI
> Risk Engineer
>
> *iZettle ®*
> ––
>
> Mobile: +46 72 911 60 54 <+46%2072%20911%2060%2054>
> Web: www.izettle.com 
>


Addition of Meetup Group - Sydney, Mebourne Australia

2015-10-02 Thread Andy Huang
Hi,

Could someone please help with adding the following Spark Meetup Groups to
the Meetups section of http://spark.apache.org/community.html

Sydney Spark Meetup Group:
http://www.meetup.com/Sydney-Apache-Spark-User-Group/
Melbourne Spark Meetup Group:
http://www.meetup.com/Melbourne-Apache-Spark-Meetup/

Thanks

-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Fwd: Add row IDs column to data frame

2015-10-02 Thread Josh Levy-Kramer
Hi,

Iv created a simple example using the withColumn method but throws an
error. Try:

val df = List(
  (1,1),
  (1,1),
  (1,2),
  (2,2)
).toDF("col1", "col2")

val index_col = sqlContext.range( df.count() ).col("id")
val df_with_index = df.withColumn("index", index_col)

The error I get is:

org.apache.spark.sql.AnalysisException: resolved attribute(s) id#76L
missing from col1#69,col2#70 in operator !Project [col1#69,col2#70,id#76L
AS index#77L];

Is this the right to add an ID column or is this a bug?

Many thanks.
Josh


Re: spark.streaming.kafka.maxRatePerPartition for direct stream

2015-10-02 Thread Cody Koeninger
But turning backpressure on won't stop you from choking on the first batch
if you're doing e.g. some kind of in-memory aggregate that can't handle
that many records at once.

On Fri, Oct 2, 2015 at 1:10 AM, Sourabh Chandak 
wrote:

> Thanks Cody, will try to do some estimation.
>
> Thanks Nicolae, will try out this config.
>
> Thanks,
> Sourabh
>
> On Thu, Oct 1, 2015 at 11:01 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>> Hi,
>>
>>
>> Set 10ms and spark.streaming.backpressure.enabled=true
>>
>>
>> This should automatically delay the next batch until the current one is
>> processed, or at least create that balance over a few batches/periods
>> between the consume/process rate vs ingestion rate.
>>
>>
>> Nicu
>>
>> --
>> *From:* Cody Koeninger 
>> *Sent:* Thursday, October 1, 2015 11:46 PM
>> *To:* Sourabh Chandak
>> *Cc:* user
>> *Subject:* Re: spark.streaming.kafka.maxRatePerPartition for direct
>> stream
>>
>> That depends on your job, your cluster resources, the number of seconds
>> per batch...
>>
>> You'll need to do some empirical work to figure out how many messages per
>> batch a given executor can handle.  Divide that by the number of seconds
>> per batch.
>>
>>
>>
>> On Thu, Oct 1, 2015 at 3:39 PM, Sourabh Chandak 
>> wrote:
>>
>>> Hi,
>>>
>>> I am writing a spark streaming job using the direct stream method for
>>> kafka and wanted to handle the case of checkpoint failure when we'll have
>>> to reprocess the entire data from starting. By default for every new
>>> checkpoint it tries to load everything from each partition and that takes a
>>> lot of time for processing. After some searching found out that there
>>> exists a config spark.streaming.kafka.maxRatePerPartition which can be used
>>> to tackle this. My question is what will be a suitable range for this
>>> config if we have ~12 million messages in kafka with maximum message size
>>> ~10 MB.
>>>
>>> Thanks,
>>> Sourabh
>>>
>>
>>
>


RE: Problem understanding spark word count execution

2015-10-02 Thread java8964
No problem.
>From the mapper side, Spark is very similar as the MapReduce; but on the 
>reducer fetching side, MR uses sort merge vs Spark uses HashMap.
So keep this in mind that you can get data automatically sorted on the reducer 
side on MR, but not in Spark.
Spark's performance comes:Cache ability and smart arranging the tasks into 
stages. Intermediate data between stages never stored in HDFS, but in local 
disk. In MR jobs, from one MR job to another one, the intermediate data stored 
in HDFS.Spark uses threads to run tasks, instead of heavy process as MR.
Without caching, in my experience, Spark can get about 2x to 5x better than MR 
job, depending on the jog logic. If the data volume is small, Spark will be 
even better, as the processor is way more expensive than the thread in this 
case.
I didn't see your Spark script, so my guess is that you are using 
"rdd.collect()", which will transfer the final result to driver and dump it in 
the console.
Yong
Date: Fri, 2 Oct 2015 00:50:24 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: java8...@hotmail.com
CC: nicolae.maras...@adswizz.com; user@spark.apache.org

Thanks Yong , 
That was a good explanation I was looking for , however I have one doubt , you 
write - "Image that you have 2 mappers to read the data, then each mapper will 
generate the (word, count) tuple output in segments. Spark always output that 
in local file. (In fact, one file with different segments to represent 
different partitions) "  if this is true then spark is very similar to Hadoop 
MapReduce (Disk IO bw phases) , with so many IOs after each stage how does 
spark achieves the performance that it does as compared to map reduce . Another 
doubt is  "The 2000 bytes sent to driver is the final output aggregated on the 
reducers end, and merged back to the driver." , which part of our word count 
code takes care of this part ? And yes there are only 273 distinct words in the 
text so that's not a surprise.
Thanks again,
Hope to get a reply.
--Kartik
On Thu, Oct 1, 2015 at 5:49 PM, java8964  wrote:



I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers 
end, and merged back to the driver.
Yong

Date: Thu, 1 Oct 2015 13:33:59 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: nicolae.maras...@adswizz.com
CC: user@spark.apache.org

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such 
that EACH executer works on only the local copy of file part available , in 
this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 partitions 
will be created and each task will run on  1 partition (which is what I am 
seeing in the stages logs) , also i assume it will read the file in a way that 
each executer will have exactly same amount of data. so there shouldn't be any 
shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the 
output I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC 
TimeInput Size / RecordsWrite TimeShuffle Write Size / 
RecordsErrors0440SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 
s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 
10.35.244.112015/09/29 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 
273I have following questions -
1) What exactly is 2.7KB of shuffle write  ?2) is this 2.7 KB of shuffle 

Re: Checkpointing is super slow

2015-10-02 Thread Cody Koeninger
Why are you sure it's checkpointing speed?

Have you compared it against checkpointing to hdfs, s3, or local disk?

On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak 
wrote:

> Hi,
>
> I have a receiverless kafka streaming job which was started yesterday
> evening and was running fine till 4 PM today. Suddenly post that writing of
> checkpoint has slowed down and it is now not able to catch up with the
> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
> checkpointing. Spark streaming is done using a backported code.
>
> Running nodetool shows that the Read latency of the cfs keyspace is ~8.5
> ms.
>
> Can someone please help me resolve this?
>
> Thanks,
> Sourabh
>
>


RE: Accumulator of rows?

2015-10-02 Thread Saif.A.Ellafi
Thank you, exactly what I was looking for. I have read of it before but never 
associated.

Saif

From: Adrian Tanase [mailto:atan...@adobe.com]
Sent: Friday, October 02, 2015 8:24 AM
To: Ellafi, Saif A.; user@spark.apache.org
Subject: Re: Accumulator of rows?

Have you seen window functions?
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

From: "saif.a.ell...@wellsfargo.com"
Date: Thursday, October 1, 2015 at 9:44 PM
To: "user@spark.apache.org"
Subject: Accumulator of rows?

Hi all,

I need to repeat a couple rows from a dataframe by n times each. To do so, I 
plan to create a new Data Frame, but I am being unable to find a way to 
accumulate “Rows” somewhere, as this might get huge, I can’t accumulate into a 
mutable Array, I think?.

Thanks,
Saif



Re: automatic start of streaming job on failure on YARN

2015-10-02 Thread Steve Loughran

On 1 Oct 2015, at 16:52, Adrian Tanase 
> wrote:

This happens automatically as long as you submit with cluster mode instead of 
client mode. (e.g. ./spark-submit —master yarn-cluster …)

The property you mention would help right after that, although you will need to 
set it to a large value (e.g. 1000?) - as there is no “infinite” support.


that doesn't catch very broken apps.

There is a way during app submission for the application launcher to specify a 
reset window; a time after which failures are reset

Its launcher-API only, and spark doesn't (currently) set it:

https://issues.apache.org/jira/browse/YARN-611


it could be done in a hadoop-version neutral way using introspection, otherwise 
you'll have to patch the source for a version of spark that only builds/runs 
against Hadoop 2.6


-adrian

From: Jeetendra Gangele
Date: Thursday, October 1, 2015 at 4:30 PM
To: user
Subject: automatic start of streaming job on failure on YARN


We've a streaming application running on yarn and we would like to ensure that 
is up running 24/7.

Is there a way to tell yarn to automatically restart a specific application on 
failure?

There is property yarn.resourcemanager.am.max-attempts which is default set to 
2 setting it to bigger value is the solution? Also I did observed this does not 
seems to work my application is failing and not starting automatically.

Mesos has this build in support wondering why yarn is lacking here?



Regards

jeetendra



Re: Getting spark application driver ID programmatically

2015-10-02 Thread Igor Berman
if driver id is application id then yes you can do it with
String appId = ctx.sc().applicationId(); //when ctx is java context


On 1 October 2015 at 20:44, Snehal Nagmote  wrote:

> Hi ,
>
> I have use case where we need to automate start/stop of spark streaming
> application.
>
> To stop spark job, we need driver/application id of the job .
>
> For example :
>
> /app/spark-master/bin/spark-class org.apache.spark.deploy.Client kill
> spark://10.65.169.242:7077 $driver_id
>
> I am thinking to get the driver id when we submit the job in verbose mode
> , by parsing the output .
>
> Does spark provide any api where it provides driver id of application .
>
> Is there any better or cleaner way to get driver ID ?
>
>
> Any suggestions would be helpful  ,
>
> Thanks,
> Snehal
>
>


Re: Shuffle Write v/s Shuffle Read

2015-10-02 Thread Zoltán Zvara
Hi,

Shuffle output goes to local disk each time, as far as I know, never to
memory.

On Fri, Oct 2, 2015 at 1:26 PM Adrian Tanase  wrote:

> I’m not sure this is related to memory management – the shuffle is the
> central act of moving data around nodes when the computations need the data
> on another node (E.g. Group by, sort, etc)
>
> Shuffle read and shuffle write should be mirrored on the left/right side
> of a shuffle between 2 stages.
>
> -adrian
>
> From: Kartik Mathur
> Date: Thursday, October 1, 2015 at 10:36 PM
> To: user
> Subject: Shuffle Write v/s Shuffle Read
>
> Hi
>
> I am trying to better understand shuffle in spark .
>
> Based on my understanding thus far ,
>
> *Shuffle Write* : writes stage output for intermediate stage on local
> disk if memory is not sufficient.,
> Example , if each worker has 200 MB memory for intermediate results and
> the results are 300MB then , each executer* will keep 200 MB in memory
> and will write remaining 100 MB on local disk .  *
>
> *Shuffle Read : *Each executer will read from other executer's *memory +
> disk , so total read in above case will be 300(200 from memory and 100 from
> disk)*num of executers ? *
>
> Is my understanding correct ?
>
> Thanks,
> Kartik
>


How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread unk1102
Hi I have registed my hive UDF using the following code:

hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
public String call(String o) throws Execption {
//bla bla
}
},DataTypes.String);

Now I want to use above MyUDF in DataFrame. How do we use it? I know how to
use it in a sql and it works fine

hiveContext.sql(select MyUDF("test") from myTable);

My hiveContext.sql() query involves group by on multiple columns so for
scaling purpose I am trying to convert this query into DataFrame APIs

dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();

Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle Write v/s Shuffle Read

2015-10-02 Thread Adrian Tanase
I’m not sure this is related to memory management – the shuffle is the central 
act of moving data around nodes when the computations need the data on another 
node (E.g. Group by, sort, etc)

Shuffle read and shuffle write should be mirrored on the left/right side of a 
shuffle between 2 stages.

-adrian

From: Kartik Mathur
Date: Thursday, October 1, 2015 at 10:36 PM
To: user
Subject: Shuffle Write v/s Shuffle Read

Hi

I am trying to better understand shuffle in spark .

Based on my understanding thus far ,

Shuffle Write : writes stage output for intermediate stage on local disk if 
memory is not sufficient.,
Example , if each worker has 200 MB memory for intermediate results and the 
results are 300MB then , each executer will keep 200 MB in memory and will 
write remaining 100 MB on local disk .

Shuffle Read : Each executer will read from other executer's memory + disk , so 
total read in above case will be 300(200 from memory and 100 from disk)*num of 
executers ?

Is my understanding correct ?

Thanks,
Kartik



Compute Real-time Visualizations using spark streaming

2015-10-02 Thread Sureshv
Hi, 

 I am new to Spark and I would like know how to compute (dynamically)
real-time visualizations using Spark streaming (Kafka).

Use case : We have Real-time analytics dashboard (reports and dashboard),
user can define report (visualization) with certain parameters like, refresh
period, choose various metrics (segment variables & profile variables). 

We should compute only visualizations those are in use (users are accessing)
with events coming from kafka streams using Spark streaming. 

Solution : One way of doing is compute visualizations for every incoming
message and write back into result streams and application which consume the
processed data/result streams. 

I would like to know is there any better approach? Please advice me here.

Thanks,
Suresh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Compute-Real-time-Visualizations-using-spark-streaming-tp24908.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsTextFile creates an empty folder in HDFS

2015-10-02 Thread jarias
Dear list,

I'm experimenting a problem when trying to write any RDD to HDFS. I've tried
with minimal examples, scala programs and pyspark programs both in local and
cluster modes and as standalone applications or shells.

My problem is that when invoking the write command, a task is executed but
it just creates an empty folder in the given HDFS path. I'm lost at this
point because there is no sign of error or warning in the spark logs.

I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
working properly when using the command tools or running MapReduce jobs.


Thank you for your time, I'm not sure if this is just a rookie mistake or an
overall config problem.

Just a working example:

This sequence produces the following log and creates the empty folder
"test":

scala> val l = Seq.fill(1)(nextInt)
scala> val dist = sc.parallelize(l)
scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")


15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
version is 1
15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
:27
15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
:27) with 2 output partitions (allowLocal=false)
15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
:27)
15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7]
at saveAsTextFile at :27), which has no missing parents
15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
curMem=184615, maxMem=278302556
15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 134.1 KB, free 265.1 MB)
15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
curMem=321951, maxMem=278302556
15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes
in memory (estimated size 46.6 KB, free 265.1 MB)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
broadcast_3_piece0
15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:839
15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3
(MapPartitionsRDD[7] at saveAsTextFile at :27)
15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB)
15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
6) in 312 ms on nodo2.i3a.info (1/2)
15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
7) in 313 ms on nodo3.i3a.info (2/2)
15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
all completed, from pool 
15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
:27) finished in 0.334 s
15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
:27, took 0.436388 s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accumulator of rows?

2015-10-02 Thread Adrian Tanase
Have you seen window functions?
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

From: "saif.a.ell...@wellsfargo.com"
Date: Thursday, October 1, 2015 at 9:44 PM
To: "user@spark.apache.org"
Subject: Accumulator of rows?

Hi all,

I need to repeat a couple rows from a dataframe by n times each. To do so, I 
plan to create a new Data Frame, but I am being unable to find a way to 
accumulate “Rows” somewhere, as this might get huge, I can’t accumulate into a 
mutable Array, I think?.

Thanks,
Saif



from groupBy return a DataFrame without aggregation?

2015-10-02 Thread Saif.A.Ellafi
Given ID, DATE, I need all sorted dates per ID, what is the easiest way?

I got this but I don't like it:
val l = zz.groupBy("id", " dt").agg($"dt".as("dummy")).sort($"dt".asc)

Saif



Re: HDFS small file generation problem

2015-10-02 Thread nibiau
Hello,
Yes but :
- In the Java API I don't find a API to create a HDFS archive
- As soon as I receive a message (with messageID) I need to replace the old 
existing file by the new one (name of file being the messageID), is it possible 
with archive ?

Tks
Nicolas

- Mail original -
De: "Jörn Franke" 
À: nib...@free.fr, "user" 
Envoyé: Lundi 28 Septembre 2015 23:53:56
Objet: Re: HDFS small file generation problem



Use hadoop archive 



Le dim. 27 sept. 2015 à 15:36, < nib...@free.fr > a écrit : 


Hello, 
I'm still investigating my small file generation problem generated by my Spark 
Streaming jobs. 
Indeed, my Spark Streaming jobs are receiving a lot of small events (avg 10kb), 
and I have to store them inside HDFS in order to treat them by PIG jobs 
on-demand. 
The problem is the fact that I generate a lot of small files in HDFS (several 
millions) and it can be problematic. 
I investigated to use Hbase or Archive file but I don't want to do it finally. 
So, what about this solution : 
- Spark streaming generate on the fly several millions of small files in HDFS 
- Each night I merge them inside a big daily file 
- I launch my PIG jobs on this big file ? 

Other question I have : 
- Is it possible to append a big file (daily) by adding on the fly my event ? 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming over YARN

2015-10-02 Thread Dibyendu Bhattacharya
Hi,

If you need to use Receiver based approach , you can try this one :
https://github.com/dibbhatt/kafka-spark-consumer

This is also part of Spark packages :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

You just need to specify the number of Receivers you want for desired
parallelism while receiving , and rest of the thing will be taken care by
ReceiverLauncher.

This Low level Receiver  will give better parallelism both on receiving ,
and on processing the RDD.

Default Receiver based API ( KafkaUtils.createStream) using Kafka High
level API and Kafka high Level API has serious issue to be used in
production .


Regards,
Dibyendu





On Fri, Oct 2, 2015 at 9:22 PM,  wrote:

> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" 
> À: "Nicolas Biau" 
> Cc: "user" 
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: HDFS small file generation problem

2015-10-02 Thread Brett Antonides
I had a very similar problem and solved it with Hive and ORC files using
the Spark SQLContext.
* Create a table in Hive stored as an ORC file (I recommend using
partitioning too)
* Use SQLContext.sql to Insert data into the table
* Use SQLContext.sql to periodically run ALTER TABLE...CONCATENATE to merge
your many small files into larger files optimized for your HDFS block size
   * Since the CONCATENATE command operates on files in place it is
transparent to any downstream processing

Cheers,
Brett


On Fri, Oct 2, 2015 at 3:48 PM,  wrote:

> Hello,
> Yes but :
> - In the Java API I don't find a API to create a HDFS archive
> - As soon as I receive a message (with messageID) I need to replace the
> old existing file by the new one (name of file being the messageID), is it
> possible with archive ?
>
> Tks
> Nicolas
>
> - Mail original -
> De: "Jörn Franke" 
> À: nib...@free.fr, "user" 
> Envoyé: Lundi 28 Septembre 2015 23:53:56
> Objet: Re: HDFS small file generation problem
>
>
>
> Use hadoop archive
>
>
>
> Le dim. 27 sept. 2015 à 15:36, < nib...@free.fr > a écrit :
>
>
> Hello,
> I'm still investigating my small file generation problem generated by my
> Spark Streaming jobs.
> Indeed, my Spark Streaming jobs are receiving a lot of small events (avg
> 10kb), and I have to store them inside HDFS in order to treat them by PIG
> jobs on-demand.
> The problem is the fact that I generate a lot of small files in HDFS
> (several millions) and it can be problematic.
> I investigated to use Hbase or Archive file but I don't want to do it
> finally.
> So, what about this solution :
> - Spark streaming generate on the fly several millions of small files in
> HDFS
> - Each night I merge them inside a big daily file
> - I launch my PIG jobs on this big file ?
>
> Other question I have :
> - Is it possible to append a big file (daily) by adding on the fly my
> event ?
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread Cody Koeninger
Neither of those statements are true.
You need more receivers if you want more parallelism.
You don't have to manage offset positioning with the direct stream if you
don't want to, as long as you can accept the limitations of Spark
checkpointing.

On Fri, Oct 2, 2015 at 10:52 AM,  wrote:

> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" 
> À: "Nicolas Biau" 
> Cc: "user" 
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Spark Streaming over YARN

2015-10-02 Thread nibiau
Hello,
I have a job receiving data from kafka (4 partitions) and persisting data 
inside MongoDB.
It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 cores) 
only on node is receiving all the kafka partitions and only one node is 
processing my RDD treatment (foreach function)
How can I force YARN to use all the resources nodes and cores to process the 
data (receiver & RDD treatment)

Tks a lot
Nicolas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread Michael Armbrust
import org.apache.spark.sql.functions.*

callUDF("MyUDF", col("col1"), col("col2"))

On Fri, Oct 2, 2015 at 6:25 AM, unk1102  wrote:

> Hi I have registed my hive UDF using the following code:
>
> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
> public String call(String o) throws Execption {
> //bla bla
> }
> },DataTypes.String);
>
> Now I want to use above MyUDF in DataFrame. How do we use it? I know how to
> use it in a sql and it works fine
>
> hiveContext.sql(select MyUDF("test") from myTable);
>
> My hiveContext.sql() query involves group by on multiple columns so for
> scaling purpose I am trying to convert this query into DataFrame APIs
>
>
> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>
> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread Cody Koeninger
If you're using the receiver based implementation, and want more
parallelism, you have to create multiple streams and union them together.

Or use the direct stream.

On Fri, Oct 2, 2015 at 10:40 AM,  wrote:

> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread nibiau
>From my understanding as soon as I use YARN I don't need to use parrallelisme 
>(at least for RDD treatment)
I don't want to use direct stream as I have to manage the offset positionning 
(in order to be able to start from the last offset treated after a spark job 
failure) 


- Mail original -
De: "Cody Koeninger" 
À: "Nicolas Biau" 
Cc: "user" 
Envoyé: Vendredi 2 Octobre 2015 17:43:41
Objet: Re: Spark Streaming over YARN


If you're using the receiver based implementation, and want more parallelism, 
you have to create multiple streams and union them together. 


Or use the direct stream. 


On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: 


Hello, 
I have a job receiving data from kafka (4 partitions) and persisting data 
inside MongoDB. 
It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 cores) 
only on node is receiving all the kafka partitions and only one node is 
processing my RDD treatment (foreach function) 
How can I force YARN to use all the resources nodes and cores to process the 
data (receiver & RDD treatment) 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL: Reading data from hdfs and storing into multiple paths

2015-10-02 Thread Michael Armbrust
Once you convert your data to a dataframe (look at spark-csv), try
df.write.partitionBy("", "mm").save("...").

On Thu, Oct 1, 2015 at 4:11 PM, haridass saisriram <
haridass.saisri...@gmail.com> wrote:

> Hi,
>
>   I am trying to find a simple example to read a data file on HDFS. The
> file has the following format
> a , b  , c ,,mm
> a1,b1,c1,2015,09
> a2,b2,c2,2014,08
>
>
> I would like to read this file and store it in HDFS partitioned by year
> and month. Something like this
> /path/to/hdfs//mm
>
> I want to specify the "/path/to/hdfs/" and /mm should be populated
> automatically based on those columns. Could some one point me in the right
> direction
>
> Thank you,
> Sri Ram
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread nibiau
Ok so if I set for example 4 receivers (number of nodes), how RDD will be 
distributed over the nodes/core.
For example in my example I have 4 nodes (with 2 cores) 

Tks
Nicolas 


- Mail original -
De: "Dibyendu Bhattacharya" 
À: nib...@free.fr
Cc: "Cody Koeninger" , "user" 
Envoyé: Vendredi 2 Octobre 2015 18:01:59
Objet: Re: Spark Streaming over YARN


Hi, 


If you need to use Receiver based approach , you can try this one : 
https://github.com/dibbhatt/kafka-spark-consumer 


This is also part of Spark packages : 
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer 


You just need to specify the number of Receivers you want for desired 
parallelism while receiving , and rest of the thing will be taken care by 
ReceiverLauncher. 


This Low level Receiver will give better parallelism both on receiving , and on 
processing the RDD. 


Default Receiver based API ( KafkaUtils.createStream) using Kafka High level 
API and Kafka high Level API has serious issue to be used in production . 




Regards, 

Dibyendu 










On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote: 


>From my understanding as soon as I use YARN I don't need to use parrallelisme 
>(at least for RDD treatment) 
I don't want to use direct stream as I have to manage the offset positionning 
(in order to be able to start from the last offset treated after a spark job 
failure) 


- Mail original - 
De: "Cody Koeninger" < c...@koeninger.org > 
À: "Nicolas Biau" < nib...@free.fr > 
Cc: "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 17:43:41 
Objet: Re: Spark Streaming over YARN 




If you're using the receiver based implementation, and want more parallelism, 
you have to create multiple streams and union them together. 


Or use the direct stream. 


On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: 


Hello, 
I have a job receiving data from kafka (4 partitions) and persisting data 
inside MongoDB. 
It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 cores) 
only on node is receiving all the kafka partitions and only one node is 
processing my RDD treatment (foreach function) 
How can I force YARN to use all the resources nodes and cores to process the 
data (receiver & RDD treatment) 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming over YARN

2015-10-02 Thread Dibyendu Bhattacharya
If your Kafka topic has 4 partitions , and if you specify 4 Receivers,
messages from each partitions are received by a dedicated receiver. so your
receiving parallelism is defined by your number of partitions of your topic
.  Every receiver task will be scheduled evenly among nodes in your
cluster. There was a JIRA fixed in spark 1.5 which does even distribution
of receivers.


Now for RDD parallelism ( i.e parallelism while processing your RDD )  is
controlled by your Block Interval and Batch Interval.

If your block Interval is 200 Ms, there will be 5 blocks per second. If
your Batch Interval is 3 seconds, there will 15 blocks per batch. And every
Batch is one RDD , thus your RDD will be 15 partition , which will be
honored during processing the RDD ..


Regards,
Dibyendu


On Fri, Oct 2, 2015 at 9:40 PM,  wrote:

> Ok so if I set for example 4 receivers (number of nodes), how RDD will be
> distributed over the nodes/core.
> For example in my example I have 4 nodes (with 2 cores)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" 
> À: nib...@free.fr
> Cc: "Cody Koeninger" , "user" 
> Envoyé: Vendredi 2 Octobre 2015 18:01:59
> Objet: Re: Spark Streaming over YARN
>
>
> Hi,
>
>
> If you need to use Receiver based approach , you can try this one :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> This is also part of Spark packages :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> You just need to specify the number of Receivers you want for desired
> parallelism while receiving , and rest of the thing will be taken care by
> ReceiverLauncher.
>
>
> This Low level Receiver will give better parallelism both on receiving ,
> and on processing the RDD.
>
>
> Default Receiver based API ( KafkaUtils.createStream) using Kafka High
> level API and Kafka high Level API has serious issue to be used in
> production .
>
>
>
>
> Regards,
>
> Dibyendu
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: HDFS small file generation problem

2015-10-02 Thread nibiau
Ok thanks, but can I also update data instead of insert data ?

- Mail original -
De: "Brett Antonides" 
À: user@spark.apache.org
Envoyé: Vendredi 2 Octobre 2015 18:18:18
Objet: Re: HDFS small file generation problem








I had a very similar problem and solved it with Hive and ORC files using the 
Spark SQLContext. 
* Create a table in Hive stored as an ORC file (I recommend using partitioning 
too) 
* Use SQLContext.sql to Insert data into the table 
* Use SQLContext.sql to periodically run ALTER TABLE...CONCATENATE to merge 
your many small files into larger files optimized for your HDFS block size 
* Since the CONCATENATE command operates on files in place it is transparent to 
any downstream processing 

Cheers, 
Brett 









On Fri, Oct 2, 2015 at 3:48 PM, < nib...@free.fr > wrote: 


Hello, 
Yes but : 
- In the Java API I don't find a API to create a HDFS archive 
- As soon as I receive a message (with messageID) I need to replace the old 
existing file by the new one (name of file being the messageID), is it possible 
with archive ? 

Tks 
Nicolas 

- Mail original - 
De: "Jörn Franke" < jornfra...@gmail.com > 
À: nib...@free.fr , "user" < user@spark.apache.org > 
Envoyé: Lundi 28 Septembre 2015 23:53:56 
Objet: Re: HDFS small file generation problem 





Use hadoop archive 



Le dim. 27 sept. 2015 à 15:36, < nib...@free.fr > a écrit : 


Hello, 
I'm still investigating my small file generation problem generated by my Spark 
Streaming jobs. 
Indeed, my Spark Streaming jobs are receiving a lot of small events (avg 10kb), 
and I have to store them inside HDFS in order to treat them by PIG jobs 
on-demand. 
The problem is the fact that I generate a lot of small files in HDFS (several 
millions) and it can be problematic. 
I investigated to use Hbase or Archive file but I don't want to do it finally. 
So, what about this solution : 
- Spark streaming generate on the fly several millions of small files in HDFS 
- Each night I merge them inside a big daily file 
- I launch my PIG jobs on this big file ? 

Other question I have : 
- Is it possible to append a big file (daily) by adding on the fly my event ? 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 


- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



are functions deserialized once per task?

2015-10-02 Thread Michael Albert
Greetings!
Is it true that functions, such as those passed to RDD.map(), are deserialized 
once per task?This seems to be the case looking at Executor.scala, but I don't 
really understand the code.
I'm hoping the answer is yes because that makes it easier to write code without 
worrying about thread safety.For example, suppose I have something like 
this:class FooToBarTransformer{   def transform(foo: Foo): Bar = .}
Now I want to do something like this:val rddFoo : RDD[FOO] = val 
transformer = new TransformerrddBar = rddFoo.map( foo => 
transformer.transform(foo))
If the "transformer" object is deserialized once per task, then I do not need 
to worry whether "transform()" is thread safe.If, for example, the 
implementation tried "optimize" matters by caching the deserialization, so that 
one object was sharedby all threads in a single JVM, then presumably one would 
need to worry about the thread safety of transform().
Is my understanding correct?Is this likely to continue to be true in future 
releases?Answers of "yes" would be much appreciated :-).
Thanks!-Mike



Re: Spark Streaming over YARN

2015-10-02 Thread Cody Koeninger
Direct stream has nothing to do with Zookeeper.

The direct stream can start at the offsets you specify.  If you're not
storing offsets in checkpoints, how and where you store them is up to you.

Have you read / watched the information linked from

https://github.com/koeninger/kafka-exactly-once


On Fri, Oct 2, 2015 at 11:36 AM,  wrote:

> Sorry, I just said that I NEED to manage offsets, so in case of Kafka
> Direct Stream , how can I handle this ?
> Update Zookeeper manually ? why not but any other solutions ?
>
> - Mail original -
> De: "Cody Koeninger" 
> À: "Nicolas Biau" 
> Cc: "user" 
> Envoyé: Vendredi 2 Octobre 2015 18:29:09
> Objet: Re: Spark Streaming over YARN
>
>
> Neither of those statements are true.
> You need more receivers if you want more parallelism.
> You don't have to manage offset positioning with the direct stream if you
> don't want to, as long as you can accept the limitations of Spark
> checkpointing.
>
>
> On Fri, Oct 2, 2015 at 10:52 AM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Weird Spark Dispatcher Offers?

2015-10-02 Thread Tim Chen
Do you have jobs enqueued? And if none of the jobs matches any offer it
will just decline it.

What's your job resource specifications?

Tim

On Fri, Oct 2, 2015 at 11:34 AM, Alan Braithwaite 
wrote:

> Hey All,
>
> Using spark with mesos and docker.
>
> I'm wondering if anybody's seen the behavior of spark dispatcher where it
> just continually requests resources and immediately declines the offer.
>
> https://gist.github.com/anonymous/41e7c91899b0122b91a7
>
> I'm trying to debug some issues with spark and I'm having trouble figuring
> out if this is part of the problem or if it's safe to ignore it.
>
> Any help or pointers would be appreciated.
>
> Thanks!
> - Alan
>


Weird Spark Dispatcher Offers?

2015-10-02 Thread Alan Braithwaite
Hey All,

Using spark with mesos and docker.

I'm wondering if anybody's seen the behavior of spark dispatcher where it
just continually requests resources and immediately declines the offer.

https://gist.github.com/anonymous/41e7c91899b0122b91a7

I'm trying to debug some issues with spark and I'm having trouble figuring
out if this is part of the problem or if it's safe to ignore it.

Any help or pointers would be appreciated.

Thanks!
- Alan


Re: Spark Streaming over YARN

2015-10-02 Thread nibiau
Sorry, I just said that I NEED to manage offsets, so in case of Kafka Direct 
Stream , how can I handle this ? 
Update Zookeeper manually ? why not but any other solutions ?

- Mail original -
De: "Cody Koeninger" 
À: "Nicolas Biau" 
Cc: "user" 
Envoyé: Vendredi 2 Octobre 2015 18:29:09
Objet: Re: Spark Streaming over YARN


Neither of those statements are true. 
You need more receivers if you want more parallelism. 
You don't have to manage offset positioning with the direct stream if you don't 
want to, as long as you can accept the limitations of Spark checkpointing. 


On Fri, Oct 2, 2015 at 10:52 AM, < nib...@free.fr > wrote: 


>From my understanding as soon as I use YARN I don't need to use parrallelisme 
>(at least for RDD treatment) 
I don't want to use direct stream as I have to manage the offset positionning 
(in order to be able to start from the last offset treated after a spark job 
failure) 


- Mail original - 
De: "Cody Koeninger" < c...@koeninger.org > 
À: "Nicolas Biau" < nib...@free.fr > 
Cc: "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 17:43:41 
Objet: Re: Spark Streaming over YARN 




If you're using the receiver based implementation, and want more parallelism, 
you have to create multiple streams and union them together. 


Or use the direct stream. 


On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: 


Hello, 
I have a job receiving data from kafka (4 partitions) and persisting data 
inside MongoDB. 
It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 cores) 
only on node is receiving all the kafka partitions and only one node is 
processing my RDD treatment (foreach function) 
How can I force YARN to use all the resources nodes and cores to process the 
data (receiver & RDD treatment) 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Direct Stream

2015-10-02 Thread Gerard Maas
Something like this?

I'm making the assumption that your topic name equals your keyspace for
this filtering example.

dstream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct.collect
  topics.foreach{topic =>
val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
collect with rdd.collect() that brings data to the driver
  }
}


I'm wondering: would something like this (
https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
purposes?

-kr, Gerard.

On Fri, Oct 2, 2015 at 8:12 PM, varun sharma 
wrote:

> Hi Adrian,
>
> Can you please give an example of how to achieve this:
>
>> *I would also look at filtering by topic and saving as different Dstreams
>> in your code*
>
> I have managed to get DStream[(String, String)] which is (*topic,my_data)*
> tuple. Lets call it kafkaStringStream.
> Now if I do kafkaStringStream.groupByKey() then I would get a
> DStream[(String,Iterable[String])].
> But I want a DStream instead of Iterable in order to apply saveToCassandra
> for storing it.
>
> Please help in how to transform iterable to DStream or any other
> workaround for achieving same.
>
>
> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase  wrote:
>
>> On top of that you could make the topic part of the key (e.g. keyBy in
>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>> operators for the processing.
>>
>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>> topics) and the processing is *really* different, I would also look at
>> filtering by topic and saving as different Dstreams in your code.
>>
>> Either way you need to start with Cody’s tip in order to extract the
>> topic name.
>>
>> -adrian
>>
>> From: Cody Koeninger
>> Date: Thursday, October 1, 2015 at 5:06 PM
>> To: Udit Mehta
>> Cc: user
>> Subject: Re: Kafka Direct Stream
>>
>> You can get the topic for a given partition from the offset range.  You
>> can either filter using that; or just have a single rdd and match on topic
>> when doing mapPartitions or foreachPartition (which I think is a better
>> idea)
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:
>>
>>> Hi,
>>>
>>> I am using spark direct stream to consume from multiple topics in Kafka.
>>> I am able to consume fine but I am stuck at how to separate the data for
>>> each topic since I need to process data differently depending on the topic.
>>> I basically want to split the RDD consisting on N topics into N RDD's
>>> each having 1 topic.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks in advance,
>>> Udit
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: automatic start of streaming job on failure on YARN

2015-10-02 Thread Ashish Rangole
Are you running the job in yarn cluster mode?
On Oct 1, 2015 6:30 AM, "Jeetendra Gangele"  wrote:

> We've a streaming application running on yarn and we would like to ensure
> that is up running 24/7.
>
> Is there a way to tell yarn to automatically restart a specific
> application on failure?
>
> There is property yarn.resourcemanager.am.max-attempts which is default
> set to 2 setting it to bigger value is the solution? Also I did observed
> this does not seems to work my application is failing and not starting
> automatically.
>
> Mesos has this build in support wondering why yarn is lacking here?
>
>
>
> Regards
>
> jeetendra
>


RE: from groupBy return a DataFrame without aggregation?

2015-10-02 Thread Diggs, Asoka
I may not be understanding your question - for a given date, you have many ID 
values - is that correct?  Are there additional columns in this dataset that 
you aren't mentioning, or are we simply dealing with id and dt?

What structure do you need the return data to be in?


If you're looking for a return dataframe with columns of id and dt, but you'd 
like it sorted so that for a given dt, the id's are arranged in order, then I 
would suggest something like this (I speak Python, so first example comes from 
the Python API doc):

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy
df.orderBy(["dt", "id"], ascending=[1, 1]).show()
# this will order the dataframe df by the dt column in ascending order (dates 
increasing), with matched dates ordered in ascending order by the id column.

This may help from the Scala API:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame


My apologies if I'm heading off in a direction you're not looking for. My tl:dr 
version is that you may only need sort - the groupBy is unnecessary.

Asoka

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Friday, October 02, 2015 8:32 AM
To: user@spark.apache.org
Subject: from groupBy return a DataFrame without aggregation?

Given ID, DATE, I need all sorted dates per ID, what is the easiest way?

I got this but I don't like it:
val l = zz.groupBy("id", " dt").agg($"dt".as("dummy")).sort($"dt".asc)

Saif



Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Hi Adrian,

Can you please give an example of how to achieve this:

> *I would also look at filtering by topic and saving as different Dstreams
> in your code*

I have managed to get DStream[(String, String)] which is (*topic,my_data)*
tuple. Lets call it kafkaStringStream.
Now if I do kafkaStringStream.groupByKey() then I would get a
DStream[(String,Iterable[String])].
But I want a DStream instead of Iterable in order to apply saveToCassandra
for storing it.

Please help in how to transform iterable to DStream or any other workaround
for achieving same.


On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase  wrote:

> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look at
> filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the topic
> name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.  You
> can either filter using that; or just have a single rdd and match on topic
> when doing mapPartitions or foreachPartition (which I think is a better
> idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in Kafka.
>> I am able to consume fine but I am stuck at how to separate the data for
>> each topic since I need to process data differently depending on the topic.
>> I basically want to split the RDD consisting on N topics into N RDD's
>> each having 1 topic.
>>
>> Any help would be appreciated.
>>
>> Thanks in advance,
>> Udit
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
I can see the entries processed in the table very fast but after that it
takes a long time for the checkpoint update.

Haven't tried other methods of checkpointing yet, we are using DSE on Azure.

Thanks,
Sourabh

On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger  wrote:

> Why are you sure it's checkpointing speed?
>
> Have you compared it against checkpointing to hdfs, s3, or local disk?
>
> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak 
> wrote:
>
>> Hi,
>>
>> I have a receiverless kafka streaming job which was started yesterday
>> evening and was running fine till 4 PM today. Suddenly post that writing of
>> checkpoint has slowed down and it is now not able to catch up with the
>> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
>> checkpointing. Spark streaming is done using a backported code.
>>
>> Running nodetool shows that the Read latency of the cfs keyspace is ~8.5
>> ms.
>>
>> Can someone please help me resolve this?
>>
>> Thanks,
>> Sourabh
>>
>>
>


Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Hi Nicolae,

Won't creating N KafkaDirectStreams be an overhead for my streaming job
compared to Single DirectStream?

On Fri, Oct 2, 2015 at 1:13 AM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> If you just need processing per topic, why not generate N different kafka
> direct streams ? when creating a kafka direct stream you have list of
> topics - just give one.
>
>
> Then the reusable part of your computations should be extractable as
> transformations/functions and reused between the streams.
>
>
> Nicu
>
>
>
> --
> *From:* Adrian Tanase 
> *Sent:* Thursday, October 1, 2015 5:47 PM
> *To:* Cody Koeninger; Udit Mehta
> *Cc:* user
> *Subject:* Re: Kafka Direct Stream
>
> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look at
> filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the topic
> name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.  You
> can either filter using that; or just have a single rdd and match on topic
> when doing mapPartitions or foreachPartition (which I think is a better
> idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> 
> Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ...
> Spark Streaming + Kafka Integration Guide. Apache Kafka is
> publish-subscribe messaging rethought as a distributed, partitioned,
> replicated commit log service.
> Read more...
> 
>
>
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in Kafka.
>> I am able to consume fine but I am stuck at how to separate the data for
>> each topic since I need to process data differently depending on the topic.
>> I basically want to split the RDD consisting on N topics into N RDD's
>> each having 1 topic.
>>
>> Any help would be appreciated.
>>
>> Thanks in advance,
>> Udit
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Adding the values in a column of a dataframe

2015-10-02 Thread sethah
df.agg(sum("age")).show()




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-the-values-in-a-column-of-a-dataframe-tp24909p24910.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: how to broadcast huge lookup table?

2015-10-02 Thread Saif.A.Ellafi
Hi, thank you

I would prefer to leave writing-to-disk as a last resort. Is it a last resort?

Saif

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Friday, October 02, 2015 3:54 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: how to broadcast huge lookup table?

Have you considered using external storage such as hbase for storing the look 
up table ?

Cheers

On Fri, Oct 2, 2015 at 11:50 AM, 
> wrote:
I tried broadcasting a key-value rdd, but then I cannot perform any rdd-actions 
inside a map/foreach function of another rdd.

any tips? If going into scala collections I end up with huge memory bottlenecks.

Saif




Re: Weird Spark Dispatcher Offers?

2015-10-02 Thread Alan Braithwaite
>
> So if there is no jobs to run the dispatcher will decline all offers by
> default.
>

So would this be a bug in mesos then?  I'm not sure I understand how this
offer is appearing in the first place.  It only shows up in the master logs
when I start the dispatcher.


> Also we list all the jobs enqueued and it's specifications in the Spark
> dispatcher UI, you should see the port in the dispatcher logs itself.


Yes, this job is not listed under that UI.  Hence my confusion.

Thanks,
- Alan

On Fri, Oct 2, 2015 at 11:49 AM, Tim Chen  wrote:

> So if there is no jobs to run the dispatcher will decline all offers by
> default.
>
> Also we list all the jobs enqueued and it's specifications in the Spark
> dispatcher UI, you should see the port in the dispatcher logs itself.
>
> Tim
>
> On Fri, Oct 2, 2015 at 11:46 AM, Alan Braithwaite 
> wrote:
>
>> This happened right after blowing away /var/lib/mesos zk://mesos and
>> zk://spark_mesos_dispatcher and before I've submitted anything new to it so
>> I _shouldn't_ have anything enqueued.  Unless there's state being stored
>> somewhere besides those places that I don't know about.
>>
>> I'm not sure what the resource specifications are for this one because I
>> didn't submit it directly.  If you have a way for me to grab a specific
>> offer configuration, I'd be delighted to provide it.  I just can't seem to
>> figure out how to get that information after digging through the mesos docs
>> :-(
>>
>> Also, I can't read the docker logs because:
>>
>> Oct 02 11:39:59 sparky docker[556]:
>> time="2015-10-02T11:39:59.165474049-07:00" level=error msg="Error streaming
>> logs: invalid character '\\x00' looking for beginning of value"
>>
>> (that's coming from the spark-dispatcher docker).
>>
>> Thanks!
>> - Alan
>>
>> On Fri, Oct 2, 2015 at 11:36 AM, Tim Chen  wrote:
>>
>>> Do you have jobs enqueued? And if none of the jobs matches any offer it
>>> will just decline it.
>>>
>>> What's your job resource specifications?
>>>
>>> Tim
>>>
>>> On Fri, Oct 2, 2015 at 11:34 AM, Alan Braithwaite 
>>> wrote:
>>>
 Hey All,

 Using spark with mesos and docker.

 I'm wondering if anybody's seen the behavior of spark dispatcher where
 it just continually requests resources and immediately declines the offer.

 https://gist.github.com/anonymous/41e7c91899b0122b91a7

 I'm trying to debug some issues with spark and I'm having trouble
 figuring out if this is part of the problem or if it's safe to ignore it.

 Any help or pointers would be appreciated.

 Thanks!
 - Alan

>>>
>>>
>>
>


Re: Weird Spark Dispatcher Offers?

2015-10-02 Thread Alan Braithwaite
This happened right after blowing away /var/lib/mesos zk://mesos and
zk://spark_mesos_dispatcher and before I've submitted anything new to it so
I _shouldn't_ have anything enqueued.  Unless there's state being stored
somewhere besides those places that I don't know about.

I'm not sure what the resource specifications are for this one because I
didn't submit it directly.  If you have a way for me to grab a specific
offer configuration, I'd be delighted to provide it.  I just can't seem to
figure out how to get that information after digging through the mesos docs
:-(

Also, I can't read the docker logs because:

Oct 02 11:39:59 sparky docker[556]:
time="2015-10-02T11:39:59.165474049-07:00" level=error msg="Error streaming
logs: invalid character '\\x00' looking for beginning of value"

(that's coming from the spark-dispatcher docker).

Thanks!
- Alan

On Fri, Oct 2, 2015 at 11:36 AM, Tim Chen  wrote:

> Do you have jobs enqueued? And if none of the jobs matches any offer it
> will just decline it.
>
> What's your job resource specifications?
>
> Tim
>
> On Fri, Oct 2, 2015 at 11:34 AM, Alan Braithwaite 
> wrote:
>
>> Hey All,
>>
>> Using spark with mesos and docker.
>>
>> I'm wondering if anybody's seen the behavior of spark dispatcher where it
>> just continually requests resources and immediately declines the offer.
>>
>> https://gist.github.com/anonymous/41e7c91899b0122b91a7
>>
>> I'm trying to debug some issues with spark and I'm having trouble
>> figuring out if this is part of the problem or if it's safe to ignore it.
>>
>> Any help or pointers would be appreciated.
>>
>> Thanks!
>> - Alan
>>
>
>


Re: Weird Spark Dispatcher Offers?

2015-10-02 Thread Tim Chen
Hi Alan,

The dispatcher is a Mesos framework and all frameworks in Mesos receives
offers from the master. Mesos is different than most schedulers where
we don't issue containers based on requests, but we offer available
resources to all frameworks and they in turn decide if they want to use
these resources.

In the Mesos dispatcher case we just decline offers coming in so it's
available for other frameworks.

Tim

On Fri, Oct 2, 2015 at 11:51 AM, Alan Braithwaite 
wrote:

> So if there is no jobs to run the dispatcher will decline all offers by
>> default.
>>
>
> So would this be a bug in mesos then?  I'm not sure I understand how this
> offer is appearing in the first place.  It only shows up in the master logs
> when I start the dispatcher.
>
>
>> Also we list all the jobs enqueued and it's specifications in the Spark
>> dispatcher UI, you should see the port in the dispatcher logs itself.
>
>
> Yes, this job is not listed under that UI.  Hence my confusion.
>
> Thanks,
> - Alan
>
> On Fri, Oct 2, 2015 at 11:49 AM, Tim Chen  wrote:
>
>> So if there is no jobs to run the dispatcher will decline all offers by
>> default.
>>
>> Also we list all the jobs enqueued and it's specifications in the Spark
>> dispatcher UI, you should see the port in the dispatcher logs itself.
>>
>> Tim
>>
>> On Fri, Oct 2, 2015 at 11:46 AM, Alan Braithwaite 
>> wrote:
>>
>>> This happened right after blowing away /var/lib/mesos zk://mesos and
>>> zk://spark_mesos_dispatcher and before I've submitted anything new to it so
>>> I _shouldn't_ have anything enqueued.  Unless there's state being stored
>>> somewhere besides those places that I don't know about.
>>>
>>> I'm not sure what the resource specifications are for this one because I
>>> didn't submit it directly.  If you have a way for me to grab a specific
>>> offer configuration, I'd be delighted to provide it.  I just can't seem to
>>> figure out how to get that information after digging through the mesos docs
>>> :-(
>>>
>>> Also, I can't read the docker logs because:
>>>
>>> Oct 02 11:39:59 sparky docker[556]:
>>> time="2015-10-02T11:39:59.165474049-07:00" level=error msg="Error streaming
>>> logs: invalid character '\\x00' looking for beginning of value"
>>>
>>> (that's coming from the spark-dispatcher docker).
>>>
>>> Thanks!
>>> - Alan
>>>
>>> On Fri, Oct 2, 2015 at 11:36 AM, Tim Chen  wrote:
>>>
 Do you have jobs enqueued? And if none of the jobs matches any offer it
 will just decline it.

 What's your job resource specifications?

 Tim

 On Fri, Oct 2, 2015 at 11:34 AM, Alan Braithwaite 
 wrote:

> Hey All,
>
> Using spark with mesos and docker.
>
> I'm wondering if anybody's seen the behavior of spark dispatcher where
> it just continually requests resources and immediately declines the offer.
>
> https://gist.github.com/anonymous/41e7c91899b0122b91a7
>
> I'm trying to debug some issues with spark and I'm having trouble
> figuring out if this is part of the problem or if it's safe to ignore it.
>
> Any help or pointers would be appreciated.
>
> Thanks!
> - Alan
>


>>>
>>
>


Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread Umesh Kacha
Hi Michael,

Thanks much. How do we give alias name for resultant columns? For e.g. when
using

hiveContext.sql("select MyUDF("test") as mytest from myTable");

how do we do that in DataFrame callUDF

callUDF("MyUDF", col("col1"))???

On Fri, Oct 2, 2015 at 8:23 PM, Michael Armbrust 
wrote:

> import org.apache.spark.sql.functions.*
>
> callUDF("MyUDF", col("col1"), col("col2"))
>
> On Fri, Oct 2, 2015 at 6:25 AM, unk1102  wrote:
>
>> Hi I have registed my hive UDF using the following code:
>>
>> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
>> public String call(String o) throws Execption {
>> //bla bla
>> }
>> },DataTypes.String);
>>
>> Now I want to use above MyUDF in DataFrame. How do we use it? I know how
>> to
>> use it in a sql and it works fine
>>
>> hiveContext.sql(select MyUDF("test") from myTable);
>>
>> My hiveContext.sql() query involves group by on multiple columns so for
>> scaling purpose I am trying to convert this query into DataFrame APIs
>>
>>
>> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>>
>> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


how to broadcast huge lookup table?

2015-10-02 Thread Saif.A.Ellafi
I tried broadcasting a key-value rdd, but then I cannot perform any rdd-actions 
inside a map/foreach function of another rdd.

any tips? If going into scala collections I end up with huge memory bottlenecks.

Saif



Re: how to broadcast huge lookup table?

2015-10-02 Thread Ted Yu
Have you considered using external storage such as hbase for storing the
look up table ?

Cheers

On Fri, Oct 2, 2015 at 11:50 AM,  wrote:

> I tried broadcasting a key-value rdd, but then I cannot perform any
> rdd-actions inside a map/foreach function of another rdd.
>
> any tips? If going into scala collections I end up with huge memory
> bottlenecks.
>
> Saif
>
>


No plan for broadcastHint

2015-10-02 Thread Swapnil Shinde
Hello
I am trying to do inner join with broadcastHint and getting below exception
-
I tried to increase "sqlContext.conf.autoBroadcastJoinThreshold" but still
no luck.

*Code snippet-*
val dpTargetUvOutput =
pweCvfMUVDist.as("a").join(broadcast(sourceAssgined.as("b")), $"a.web_id"
=== $"b.source_id")
.selectExpr("b.web_id AS web_id",
"b.source_id AS
source_id",
"a.gender_age_id AS
gender_age_id",
"a.hh_size AS hh_size",
"a.M_proj AS M_proj",
"a.cvf_uv_proj AS
cvf_uv_proj")

*Stack trace-*

15/10/02 14:38:45 INFO spark.SparkContext: Created broadcast 19 from
persist at UVModellingMain.scala:76
Exception in thread "main" java.lang.AssertionError: assertion failed: No
plan for BroadcastHint
 InMemoryRelation
[web_id#1128,level_id#1129,program_id#1130,date_day#1131,day_bin#1132,show_time#1133,genre#1134
iff#1135,source_id#1136], true, 1, StorageLevel(true, false, false,
false, 1), (TungstenProject [_1#1119 AS w
128,_2#1120 AS level_id#1129,_3#1121 AS program_id#1130,_4#1122 AS
date_day#1131,_5#1123 AS day_bin#1132,_6#1124
_time#1133,_7#1125 AS genre#1134,_8#1126 AS date_diff#1135,_9#1127 AS
source_id#1136]), None

at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:346)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:109)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:346)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:138)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:346)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 

Re: Weird Spark Dispatcher Offers?

2015-10-02 Thread Tim Chen
So if there is no jobs to run the dispatcher will decline all offers by
default.

Also we list all the jobs enqueued and it's specifications in the Spark
dispatcher UI, you should see the port in the dispatcher logs itself.

Tim

On Fri, Oct 2, 2015 at 11:46 AM, Alan Braithwaite 
wrote:

> This happened right after blowing away /var/lib/mesos zk://mesos and
> zk://spark_mesos_dispatcher and before I've submitted anything new to it so
> I _shouldn't_ have anything enqueued.  Unless there's state being stored
> somewhere besides those places that I don't know about.
>
> I'm not sure what the resource specifications are for this one because I
> didn't submit it directly.  If you have a way for me to grab a specific
> offer configuration, I'd be delighted to provide it.  I just can't seem to
> figure out how to get that information after digging through the mesos docs
> :-(
>
> Also, I can't read the docker logs because:
>
> Oct 02 11:39:59 sparky docker[556]:
> time="2015-10-02T11:39:59.165474049-07:00" level=error msg="Error streaming
> logs: invalid character '\\x00' looking for beginning of value"
>
> (that's coming from the spark-dispatcher docker).
>
> Thanks!
> - Alan
>
> On Fri, Oct 2, 2015 at 11:36 AM, Tim Chen  wrote:
>
>> Do you have jobs enqueued? And if none of the jobs matches any offer it
>> will just decline it.
>>
>> What's your job resource specifications?
>>
>> Tim
>>
>> On Fri, Oct 2, 2015 at 11:34 AM, Alan Braithwaite 
>> wrote:
>>
>>> Hey All,
>>>
>>> Using spark with mesos and docker.
>>>
>>> I'm wondering if anybody's seen the behavior of spark dispatcher where
>>> it just continually requests resources and immediately declines the offer.
>>>
>>> https://gist.github.com/anonymous/41e7c91899b0122b91a7
>>>
>>> I'm trying to debug some issues with spark and I'm having trouble
>>> figuring out if this is part of the problem or if it's safe to ignore it.
>>>
>>> Any help or pointers would be appreciated.
>>>
>>> Thanks!
>>> - Alan
>>>
>>
>>
>


Re: Problem understanding spark word count execution

2015-10-02 Thread Kartik Mathur
Thanks Yong,

my script is pretty straight forward -

*sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word =>
(word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") *//both paths are
HDFS.

so if for every shuffle write , it always writes to disk , what is the
meaning of these properties -

spark.shuffle.memoryFraction
spark.shuffle.spill

Thanks,
Kartik




On Fri, Oct 2, 2015 at 6:22 AM, java8964  wrote:

> No problem.
>
> From the mapper side, Spark is very similar as the MapReduce; but on the
> reducer fetching side, MR uses sort merge vs Spark uses HashMap.
>
> So keep this in mind that you can get data automatically sorted on the
> reducer side on MR, but not in Spark.
>
> Spark's performance comes:
>
>- Cache ability and smart arranging the tasks into stages.
>- Intermediate data between stages never stored in HDFS, but in local
>disk. In MR jobs, from one MR job to another one, the intermediate data
>stored in HDFS.
>- Spark uses threads to run tasks, instead of heavy process as MR.
>
>
> Without caching, in my experience, Spark can get about 2x to 5x better
> than MR job, depending on the jog logic. If the data volume is small, Spark
> will be even better, as the processor is way more expensive than the thread
> in this case.
>
> I didn't see your Spark script, so my guess is that you are using
> "rdd.collect()", which will transfer the final result to driver and dump it
> in the console.
>
> Yong
>
> --
> Date: Fri, 2 Oct 2015 00:50:24 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: java8...@hotmail.com
> CC: nicolae.maras...@adswizz.com; user@spark.apache.org
>
>
> Thanks Yong ,
>
> That was a good explanation I was looking for , however I have one doubt ,
> you write - *"**Image that you have 2 mappers to read the data, then each
> mapper will generate the (word, count) tuple output in segments. Spark
> always output that in local file. (In fact, one file with different
> segments to represent different partitions) "  *if this is true then
> spark is very similar to Hadoop MapReduce (Disk IO bw phases) , with so
> many IOs after each stage how does spark achieves the performance that it
> does as compared to map reduce . Another doubt is  *"*The 2000 bytes sent
> to driver is the final output aggregated on the reducers end, and merged
> back to the driver."* , *which part of our word count code takes care of
> this part ? And yes there are only 273 distinct words in the text so that's
> not a surprise.
>
> Thanks again,
>
> Hope to get a reply.
>
> --Kartik
>
> On Thu, Oct 1, 2015 at 5:49 PM, java8964  wrote:
>
> I am not sure about originally explain of shuffle write.
>
> In the word count example, the shuffle is needed, as Spark has to group by
> the word (ReduceBy is more accurate here). Image that you have 2 mappers to
> read the data, then each mapper will generate the (word, count) tuple
> output in segments. Spark always output that in local file. (In fact, one
> file with different segments to represent different partitions).
>
> As you can image, the output of these segments will be small, as it only
> contains (word, count of word) tuples. After each mapper generates this
> segmented file for different partitions, then the reduce will fetch the
> partitions belonging to itself.
>
> In your job summery, if your source is text file, so your data corresponds
> to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2
> partitions, about 2.5M lines of data of each partition being processed.
>
> The output of each partition is shuffle-writing 2.7K data, which is the
> size of the segment file generated, corresponding to all the unique words
> and their count of this partition. So the size is reasonable, at least for
> me.
>
> The interested number is 273 as shuffle write records. I am not 100% sure
> its meaning. Does it mean that this partition have 273 unique words from
> these 2.5M lines of data? That is kind of low, but I really don't have
> other explaining of its meaning.
>
> If you finally output shows hundreds of unique words, then it is.
>
> The 2000 bytes sent to driver is the final output aggregated on the
> reducers end, and merged back to the driver.
>
> Yong
>
>
> --
> Date: Thu, 1 Oct 2015 13:33:59 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: nicolae.maras...@adswizz.com
> CC: user@spark.apache.org
>
>
> Hi Nicolae,
> Thanks for the reply. To further clarify things -
>
> sc.textFile is reading from HDFS, now shouldn't the file be read in a way
> such that EACH executer works on only the local copy of file part available
> , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
> partitions will be created and each task will run on  1 partition (which is
> what I am seeing in the stages logs) , also i 

Reading JSON in Pyspark throws scala.MatchError

2015-10-02 Thread balajikvijayan
Running Windows 8.1, Python 2.7.x, Scala 2.10.5, Spark 1.4.1.

I'm trying to read in a large quantity of json data in a couple of files and
I receive a scala.MatchError when I do so. Json, Python and stack trace all
shown below.

Json:

{
"dataunit": {
"page_view": {
"nonce": 438058072,
"person": {
"user_id": 5846
},
"page": {
"url": "http://mysite.com/blog;
}
}
},
"pedigree": {
"true_as_of_secs": 1438627992
}
}

Python:

import pyspark
sc = pyspark.SparkContext()
sqlContext = pyspark.SQLContext(sc)
pageviews = sqlContext.read.json("[Path to folder containing file with above
json]")
pageviews.collect()

Stack Trace:
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in stage
32.0 (TID 133, localhost): scala.MatchError:
(VALUE_STRING,ArrayType(StructType(),true)) (of class scala.Tuple2)
at
org.apache.spark.sql.json.JacksonParser$.convertField(JacksonParser.scala:49)
at
org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:201)
at
org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:193)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

It seems like this issue has been resolved in scala per  SPARK-3390
  ; any thoughts on the
root cause of this in pyspark?



--

RE: Problem understanding spark word count execution

2015-10-02 Thread java8964
These parameters in fact control the behavior on reduce side, as in your word 
count example.
The partitions will be fetched by the reducer which being assigned to it. The 
reducer will fetch corresponding partitions from different mappers output, and 
it will process the data based on your logic while fetching them. This memory 
area is a sortBuffer area, and depending on "spark.shuffle.spill" (for memory 
only or memory + disk), Spark will use different implementations (AppendOnlyMap 
and ExternalAppendOnlyMap) to handle it.
The Spark shuffle memoryFraction is to control what fraction of java heap to 
use as the SortBuffer area.
You can find more information in this Jira:
https://issues.apache.org/jira/browse/SPARK-2045
Yong

Date: Fri, 2 Oct 2015 11:55:41 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: java8...@hotmail.com
CC: nicolae.maras...@adswizz.com; user@spark.apache.org

Thanks Yong,
my script is pretty straight forward - 
sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word => 
(word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") //both paths are HDFS.
so if for every shuffle write , it always writes to disk , what is the meaning 
of these properties -
spark.shuffle.memoryFraction
spark.shuffle.spill

Thanks,Kartik











On Fri, Oct 2, 2015 at 6:22 AM, java8964  wrote:



No problem.
>From the mapper side, Spark is very similar as the MapReduce; but on the 
>reducer fetching side, MR uses sort merge vs Spark uses HashMap.
So keep this in mind that you can get data automatically sorted on the reducer 
side on MR, but not in Spark.
Spark's performance comes:Cache ability and smart arranging the tasks into 
stages. Intermediate data between stages never stored in HDFS, but in local 
disk. In MR jobs, from one MR job to another one, the intermediate data stored 
in HDFS.Spark uses threads to run tasks, instead of heavy process as MR.
Without caching, in my experience, Spark can get about 2x to 5x better than MR 
job, depending on the jog logic. If the data volume is small, Spark will be 
even better, as the processor is way more expensive than the thread in this 
case.
I didn't see your Spark script, so my guess is that you are using 
"rdd.collect()", which will transfer the final result to driver and dump it in 
the console.
Yong
Date: Fri, 2 Oct 2015 00:50:24 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: java8...@hotmail.com
CC: nicolae.maras...@adswizz.com; user@spark.apache.org

Thanks Yong , 
That was a good explanation I was looking for , however I have one doubt , you 
write - "Image that you have 2 mappers to read the data, then each mapper will 
generate the (word, count) tuple output in segments. Spark always output that 
in local file. (In fact, one file with different segments to represent 
different partitions) "  if this is true then spark is very similar to Hadoop 
MapReduce (Disk IO bw phases) , with so many IOs after each stage how does 
spark achieves the performance that it does as compared to map reduce . Another 
doubt is  "The 2000 bytes sent to driver is the final output aggregated on the 
reducers end, and merged back to the driver." , which part of our word count 
code takes care of this part ? And yes there are only 273 distinct words in the 
text so that's not a surprise.
Thanks again,
Hope to get a reply.
--Kartik
On Thu, Oct 1, 2015 at 5:49 PM, java8964  wrote:



I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver 

Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread Michael Armbrust
callUDF("MyUDF", col("col1").as("name")

or

callUDF("MyUDF", col("col1").alias("name")

On Fri, Oct 2, 2015 at 3:29 PM, Umesh Kacha  wrote:

> Hi Michael,
>
> Thanks much. How do we give alias name for resultant columns? For e.g.
> when using
>
> hiveContext.sql("select MyUDF("test") as mytest from myTable");
>
> how do we do that in DataFrame callUDF
>
> callUDF("MyUDF", col("col1"))???
>
> On Fri, Oct 2, 2015 at 8:23 PM, Michael Armbrust 
> wrote:
>
>> import org.apache.spark.sql.functions.*
>>
>> callUDF("MyUDF", col("col1"), col("col2"))
>>
>> On Fri, Oct 2, 2015 at 6:25 AM, unk1102  wrote:
>>
>>> Hi I have registed my hive UDF using the following code:
>>>
>>> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
>>> public String call(String o) throws Execption {
>>> //bla bla
>>> }
>>> },DataTypes.String);
>>>
>>> Now I want to use above MyUDF in DataFrame. How do we use it? I know how
>>> to
>>> use it in a sql and it works fine
>>>
>>> hiveContext.sql(select MyUDF("test") from myTable);
>>>
>>> My hiveContext.sql() query involves group by on multiple columns so for
>>> scaling purpose I am trying to convert this query into DataFrame APIs
>>>
>>>
>>> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>>>
>>> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: how to get Application ID from Submission ID or Driver ID programmatically

2015-10-02 Thread firemonk9
Have you found how to get the applicationId from submissionId ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-get-Application-ID-from-Submission-ID-or-Driver-ID-programmatically-tp24341p24912.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
Tried using local checkpointing as well, and even that becomes slow after
sometime. Any idea what can be wrong?

Thanks,
Sourabh

On Fri, Oct 2, 2015 at 9:35 AM, Sourabh Chandak 
wrote:

> I can see the entries processed in the table very fast but after that it
> takes a long time for the checkpoint update.
>
> Haven't tried other methods of checkpointing yet, we are using DSE on
> Azure.
>
> Thanks,
> Sourabh
>
> On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger  wrote:
>
>> Why are you sure it's checkpointing speed?
>>
>> Have you compared it against checkpointing to hdfs, s3, or local disk?
>>
>> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a receiverless kafka streaming job which was started yesterday
>>> evening and was running fine till 4 PM today. Suddenly post that writing of
>>> checkpoint has slowed down and it is now not able to catch up with the
>>> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
>>> checkpointing. Spark streaming is done using a backported code.
>>>
>>> Running nodetool shows that the Read latency of the cfs keyspace is ~8.5
>>> ms.
>>>
>>> Can someone please help me resolve this?
>>>
>>> Thanks,
>>> Sourabh
>>>
>>>
>>
>


Re: Reading JSON in Pyspark throws scala.MatchError

2015-10-02 Thread Ted Yu
I got the following when parsing your input with master branch (Python
version 2.6.6):

http://pastebin.com/1w8WM3tz

FYI

On Fri, Oct 2, 2015 at 1:42 PM, balajikvijayan 
wrote:

> Running Windows 8.1, Python 2.7.x, Scala 2.10.5, Spark 1.4.1.
>
> I'm trying to read in a large quantity of json data in a couple of files
> and
> I receive a scala.MatchError when I do so. Json, Python and stack trace all
> shown below.
>
> Json:
>
> {
> "dataunit": {
> "page_view": {
> "nonce": 438058072,
> "person": {
> "user_id": 5846
> },
> "page": {
> "url": "http://mysite.com/blog;
> }
> }
> },
> "pedigree": {
> "true_as_of_secs": 1438627992
> }
> }
>
> Python:
>
> import pyspark
> sc = pyspark.SparkContext()
> sqlContext = pyspark.SQLContext(sc)
> pageviews = sqlContext.read.json("[Path to folder containing file with
> above
> json]")
> pageviews.collect()
>
> Stack Trace:
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 32.0 (TID 133, localhost): scala.MatchError:
> (VALUE_STRING,ArrayType(StructType(),true)) (of class scala.Tuple2)
> at
>
> org.apache.spark.sql.json.JacksonParser$.convertField(JacksonParser.scala:49)
> at
>
> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:201)
> at
>
> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:193)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at
>
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111)
> at
>
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
>
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
>
> 

How does FAIR job scheduler work in Standalone cluster mode?

2015-10-02 Thread Jacek Laskowski
Hi,

The docs in Resource Scheduling [1] says:

> The standalone cluster mode currently only supports a simple FIFO scheduler 
> across applications.

There's however `spark.scheduler.mode` that can be one of `FAIR`,
`FIFO`, `NONE` values.

Is FAIR available for Spark Standalone cluster mode? Is there a page
where it's described in more details? I can't seem to find much about
FAIR and Standalone in Job Scheduling [2].

[1] 
http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/spark-standalone.html
[2] 
http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/job-scheduling.html

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does FAIR job scheduler work in Standalone cluster mode?

2015-10-02 Thread Marcelo Vanzin
You're mixing app scheduling in the cluster manager (your [1] link)
with job scheduling within an app (your [2] link). They're independent
things.

On Fri, Oct 2, 2015 at 2:22 PM, Jacek Laskowski  wrote:
> Hi,
>
> The docs in Resource Scheduling [1] says:
>
>> The standalone cluster mode currently only supports a simple FIFO scheduler 
>> across applications.
>
> There's however `spark.scheduler.mode` that can be one of `FAIR`,
> `FIFO`, `NONE` values.
>
> Is FAIR available for Spark Standalone cluster mode? Is there a page
> where it's described in more details? I can't seem to find much about
> FAIR and Standalone in Job Scheduling [2].
>
> [1] 
> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/spark-standalone.html
> [2] 
> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/job-scheduling.html
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does FAIR job scheduler work in Standalone cluster mode?

2015-10-02 Thread Marcelo Vanzin
On Fri, Oct 2, 2015 at 5:29 PM, Jacek Laskowski  wrote:
>> The standalone cluster mode currently only supports a simple FIFO scheduler 
>> across applications.
>
> is correct or not? :(

I think so. But, because they're different things, that does not mean
you cannot use a fair scheduler to schedule jobs *inside* your
application.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does FAIR job scheduler work in Standalone cluster mode?

2015-10-02 Thread Jacek Laskowski
Hi,

I may indeed mistakenly be mixing different aspect. Thanks for the
answer! Does this answer my initial question, though, as I'm still
unsure whether the sentence:

> The standalone cluster mode currently only supports a simple FIFO scheduler 
> across applications.

is correct or not? :(

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Fri, Oct 2, 2015 at 8:20 PM, Marcelo Vanzin  wrote:
> You're mixing app scheduling in the cluster manager (your [1] link)
> with job scheduling within an app (your [2] link). They're independent
> things.
>
> On Fri, Oct 2, 2015 at 2:22 PM, Jacek Laskowski  wrote:
>> Hi,
>>
>> The docs in Resource Scheduling [1] says:
>>
>>> The standalone cluster mode currently only supports a simple FIFO scheduler 
>>> across applications.
>>
>> There's however `spark.scheduler.mode` that can be one of `FAIR`,
>> `FIFO`, `NONE` values.
>>
>> Is FAIR available for Spark Standalone cluster mode? Is there a page
>> where it's described in more details? I can't seem to find much about
>> FAIR and Standalone in Job Scheduling [2].
>>
>> [1] 
>> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/spark-standalone.html
>> [2] 
>> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/job-scheduling.html
>>
>> Pozdrawiam,
>> Jacek
>>
>> --
>> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
>> Follow me at https://twitter.com/jaceklaskowski
>> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem understanding spark word count execution

2015-10-02 Thread Kartik Mathur
Thanks Yong ,

That was a good explanation I was looking for , however I have one doubt ,
you write - *"**Image that you have 2 mappers to read the data, then each
mapper will generate the (word, count) tuple output in segments. Spark
always output that in local file. (In fact, one file with different
segments to represent different partitions) "  *if this is true then spark
is very similar to Hadoop MapReduce (Disk IO bw phases) , with so many IOs
after each stage how does spark achieves the performance that it does as
compared to map reduce . Another doubt is  *"*The 2000 bytes sent to driver
is the final output aggregated on the reducers end, and merged back to the
driver."* , *which part of our word count code takes care of this part ?
And yes there are only 273 distinct words in the text so that's not a
surprise.

Thanks again,

Hope to get a reply.

--Kartik

On Thu, Oct 1, 2015 at 5:49 PM, java8964  wrote:

> I am not sure about originally explain of shuffle write.
>
> In the word count example, the shuffle is needed, as Spark has to group by
> the word (ReduceBy is more accurate here). Image that you have 2 mappers to
> read the data, then each mapper will generate the (word, count) tuple
> output in segments. Spark always output that in local file. (In fact, one
> file with different segments to represent different partitions).
>
> As you can image, the output of these segments will be small, as it only
> contains (word, count of word) tuples. After each mapper generates this
> segmented file for different partitions, then the reduce will fetch the
> partitions belonging to itself.
>
> In your job summery, if your source is text file, so your data corresponds
> to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2
> partitions, about 2.5M lines of data of each partition being processed.
>
> The output of each partition is shuffle-writing 2.7K data, which is the
> size of the segment file generated, corresponding to all the unique words
> and their count of this partition. So the size is reasonable, at least for
> me.
>
> The interested number is 273 as shuffle write records. I am not 100% sure
> its meaning. Does it mean that this partition have 273 unique words from
> these 2.5M lines of data? That is kind of low, but I really don't have
> other explaining of its meaning.
>
> If you finally output shows hundreds of unique words, then it is.
>
> The 2000 bytes sent to driver is the final output aggregated on the
> reducers end, and merged back to the driver.
>
> Yong
>
>
> --
> Date: Thu, 1 Oct 2015 13:33:59 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: nicolae.maras...@adswizz.com
> CC: user@spark.apache.org
>
>
> Hi Nicolae,
> Thanks for the reply. To further clarify things -
>
> sc.textFile is reading from HDFS, now shouldn't the file be read in a way
> such that EACH executer works on only the local copy of file part available
> , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
> partitions will be created and each task will run on  1 partition (which is
> what I am seeing in the stages logs) , also i assume it will read the file
> in a way that each executer will have exactly same amount of data. so there
> shouldn't be any shuffling in reading atleast.
>
> During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
> the output I am seeing
>
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
> SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
> (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
> 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
> I have following questions -
>
> 1) What exactly is 2.7KB of shuffle write  ?
> 2) is this 2.7 KB of shuffle write is local to that executer ?
> 3) In the executers log I am seeing 2000 bytes results sent to the driver
> , if instead this number is much much greater than 2000 byes such that it
> does not fit in executer's memory , will shuffle write increase ?
> 4)For word count 256 MB data is substantial amount text , how come the
> result for this stage is only 2000 bytes !! it should send everyword with
> respective count , for a 256 MB input this result should be much bigger ?
>
> I hope I am clear this time.
>
> Hope to get a reply,
>
> Thanks
> Kartik
>
>
>
> On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical 

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-02 Thread Philip Weaver
You can't really say 8 cores is not much horsepower when you have no idea
what my use case is. That's silly.

On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase  wrote:

> Forgot to mention that you could also restrict the parallelism to 4,
> essentially using only 4 cores at any given time, however if your job is
> complex, a stage might be broken into more than 1 task...
>
> Sent from my iPhone
>
> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>
> Reading through the docs it seems that with a combination of FAIR
> scheduler and maybe pools you can get pretty far.
>
> However the smallest unit of scheduled work is the task so probably you
> need to think about the parallelism of each transformation.
>
> I'm guessing that by increasing the level of parallelism you get many
> smaller tasks that the scheduler can then run across the many jobs you
> might have - as opposed to fewer, longer tasks...
>
> Lastly, 8 cores is not that much horsepower :)
> You may consider running with beefier machines or a larger cluster, to get
> at least tens of cores.
>
> Hope this helps,
> -adrian
>
> Sent from my iPhone
>
> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>
> Here's a specific example of what I want to do. My Spark application is
> running with total-executor-cores=8. A request comes in, it spawns a thread
> to handle that request, and starts a job. That job should use only 4 cores,
> not all 8 of the cores available to the cluster.. When the first job is
> scheduled, it should take only 4 cores, not all 8 of the cores that are
> available to the driver.
>
> Is there any way to accomplish this? This is on mesos.
>
> In order to support the use cases described in
> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
> application runs for a long time and handles requests from multiple users,
> I believe what I'm asking about is a very important feature. One of the
> goals is to get lower latency for each request, but if the first request
> takes all resources and we can't guarantee any free resources for the
> second request, then that defeats the purpose. Does that make sense?
>
> Thanks in advance for any advice you can provide!
>
> - Philip
>
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
> wrote:
>
>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>> scheduler, so I can define a long-running application capable of executing
>> multiple simultaneous spark jobs.
>>
>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>> but I want my application to be able to take several times that in order to
>> run multiple jobs at the same time.
>>
>> I suppose my question is more basic: How can I limit the number of cores
>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>> Spark from using more cores to load it.
>>
>> Does it make sense what I am trying to accomplish, and is there any way
>> to do it?
>>
>> - Philip
>>
>>
>


Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-10-02 Thread satish chandra j
Hi,
I am getting the below error while implementing the above custom class code
given by you

error type mismatch: found String required: Serializable

Please let me know if i am missing anything here

Regards,
Satish Chandra

On Wed, Sep 23, 2015 at 12:34 PM, Petr Novak  wrote:

> You can implement your own case class supporting more then 22 fields. It
> is something like:
>
> class MyRecord(val val1: String, val val2: String, ... more then 22, in this 
> case f.e. 26)
>   extends Product with Serializable {
>
>   def canEqual(that: Any): Boolean = that.isInstanceOf[MyRecord]
>
>   def productArity: Int = 26 // example value, it is amount of arguments
>
>   def productElement(n: Int): Serializable = n match {
> case  1 => val1
> case  2 => val2
> //... cases up to 26
>   }
> }
>
> You can google it for more details.
>
> Petr
>


Re: Checkpointing is super slow

2015-10-02 Thread Tathagata Das
Could you get the log4j INFO/DEBUG level logs which shows the error, and if
possible time taken to write the checkpoints.

On Fri, Oct 2, 2015 at 6:28 PM, Sourabh Chandak 
wrote:

> Offset checkpoints (partition, offset) when using kafka direct streaming
> approach
>
>
> On Friday, October 2, 2015, Tathagata Das  wrote:
>
>> Which checkpointing are you talking about? DStream checkpoints (which
>> saves the DAG of DStreams, that is, only metadata), or RDD checkpointing
>> (which saves the actual intermediate RDD data)
>>
>> TD
>>
>> On Fri, Oct 2, 2015 at 2:56 PM, Sourabh Chandak 
>> wrote:
>>
>>> Tried using local checkpointing as well, and even that becomes slow
>>> after sometime. Any idea what can be wrong?
>>>
>>> Thanks,
>>> Sourabh
>>>
>>> On Fri, Oct 2, 2015 at 9:35 AM, Sourabh Chandak 
>>> wrote:
>>>
 I can see the entries processed in the table very fast but after that
 it takes a long time for the checkpoint update.

 Haven't tried other methods of checkpointing yet, we are using DSE on
 Azure.

 Thanks,
 Sourabh

 On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger 
 wrote:

> Why are you sure it's checkpointing speed?
>
> Have you compared it against checkpointing to hdfs, s3, or local disk?
>
> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak  > wrote:
>
>> Hi,
>>
>> I have a receiverless kafka streaming job which was started yesterday
>> evening and was running fine till 4 PM today. Suddenly post that writing 
>> of
>> checkpoint has slowed down and it is now not able to catch up with the
>> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra 
>> for
>> checkpointing. Spark streaming is done using a backported code.
>>
>> Running nodetool shows that the Read latency of the cfs keyspace is
>> ~8.5 ms.
>>
>> Can someone please help me resolve this?
>>
>> Thanks,
>> Sourabh
>>
>>
>

>>>
>>


Re: Checkpointing is super slow

2015-10-02 Thread Sourabh Chandak
Offset checkpoints (partition, offset) when using kafka direct streaming
approach

On Friday, October 2, 2015, Tathagata Das  wrote:

> Which checkpointing are you talking about? DStream checkpoints (which
> saves the DAG of DStreams, that is, only metadata), or RDD checkpointing
> (which saves the actual intermediate RDD data)
>
> TD
>
> On Fri, Oct 2, 2015 at 2:56 PM, Sourabh Chandak  > wrote:
>
>> Tried using local checkpointing as well, and even that becomes slow after
>> sometime. Any idea what can be wrong?
>>
>> Thanks,
>> Sourabh
>>
>> On Fri, Oct 2, 2015 at 9:35 AM, Sourabh Chandak > > wrote:
>>
>>> I can see the entries processed in the table very fast but after that it
>>> takes a long time for the checkpoint update.
>>>
>>> Haven't tried other methods of checkpointing yet, we are using DSE on
>>> Azure.
>>>
>>> Thanks,
>>> Sourabh
>>>
>>> On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger >> > wrote:
>>>
 Why are you sure it's checkpointing speed?

 Have you compared it against checkpointing to hdfs, s3, or local disk?

 On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak > wrote:

> Hi,
>
> I have a receiverless kafka streaming job which was started yesterday
> evening and was running fine till 4 PM today. Suddenly post that writing 
> of
> checkpoint has slowed down and it is now not able to catch up with the
> incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
> checkpointing. Spark streaming is done using a backported code.
>
> Running nodetool shows that the Read latency of the cfs keyspace is
> ~8.5 ms.
>
> Can someone please help me resolve this?
>
> Thanks,
> Sourabh
>
>

>>>
>>
>


Re: Checkpointing is super slow

2015-10-02 Thread Tathagata Das
Which checkpointing are you talking about? DStream checkpoints (which saves
the DAG of DStreams, that is, only metadata), or RDD checkpointing (which
saves the actual intermediate RDD data)

TD

On Fri, Oct 2, 2015 at 2:56 PM, Sourabh Chandak 
wrote:

> Tried using local checkpointing as well, and even that becomes slow after
> sometime. Any idea what can be wrong?
>
> Thanks,
> Sourabh
>
> On Fri, Oct 2, 2015 at 9:35 AM, Sourabh Chandak 
> wrote:
>
>> I can see the entries processed in the table very fast but after that it
>> takes a long time for the checkpoint update.
>>
>> Haven't tried other methods of checkpointing yet, we are using DSE on
>> Azure.
>>
>> Thanks,
>> Sourabh
>>
>> On Fri, Oct 2, 2015 at 6:52 AM, Cody Koeninger 
>> wrote:
>>
>>> Why are you sure it's checkpointing speed?
>>>
>>> Have you compared it against checkpointing to hdfs, s3, or local disk?
>>>
>>> On Fri, Oct 2, 2015 at 1:17 AM, Sourabh Chandak 
>>> wrote:
>>>
 Hi,

 I have a receiverless kafka streaming job which was started yesterday
 evening and was running fine till 4 PM today. Suddenly post that writing of
 checkpoint has slowed down and it is now not able to catch up with the
 incoming data. We are using the DSE stack with Spark 1.2 and Cassandra for
 checkpointing. Spark streaming is done using a backported code.

 Running nodetool shows that the Read latency of the cfs keyspace is
 ~8.5 ms.

 Can someone please help me resolve this?

 Thanks,
 Sourabh


>>>
>>
>