Re: Question about Parallel Stages in Spark

2017-06-26 Thread Pralabh Kumar
i think my words also misunderstood. My point is they will not submit
together since they are the part of one thread.

val spark =  SparkSession.builder()
  .appName("practice")
  .config("spark.scheduler.mode","FAIR")
  .enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
Thread.sleep(1000)


I ran this and both spark submit time are different for both the jobs .

Please let me if I am wrong

On Tue, Jun 27, 2017 at 9:17 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> My words cause misunderstanding.
> Step 1:A is submited to spark.
> Step 2:B is submitted to spark.
>
> Spark gets two independent jobs.The FAIR  is used to schedule A and B.
>
> Jeffrey' code did not cause two submit.
>
>
>
> ---Original---
> *From:* "Pralabh Kumar"
> *Date:* 2017/6/27 12:09:27
> *To:* "萝卜丝炒饭"<1427357...@qq.com>;
> *Cc:* "user";"satishl";"Bryan
> Jeffrey";
> *Subject:* Re: Question about Parallel Stages in Spark
>
> Hi
>
> I don't think so spark submit ,will receive two submits .  Its will
> execute one submit and then to next one .  If the application is
> multithreaded ,and two threads are calling spark submit and one time , then
> they will run parallel provided the scheduler is FAIR and task slots are
> available .
>
> But in one thread ,one submit will complete and then the another one will
> start . If there are independent stages in one job, then those will run
> parallel.
>
> I agree with Bryan Jeffrey .
>
>
> Regards
> Pralabh Kumar
>
> On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>
>> I think the spark cluster receives two submits, A and B.
>> The FAIR  is used to schedule A and B.
>> I am not sure about this.
>>
>> ---Original---
>> *From:* "Bryan Jeffrey"
>> *Date:* 2017/6/27 08:55:42
>> *To:* "satishl";
>> *Cc:* "user";
>> *Subject:* Re: Question about Parallel Stages in Spark
>>
>> Hello.
>>
>> The driver is running the individual operations in series, but each
>> operation is parallelized internally.  If you want them run in parallel you
>> need to provide the driver a mechanism to thread the job scheduling out:
>>
>> val rdd1 = sc.parallelize(1 to 10)
>> val rdd2 = sc.parallelize(1 to 20)
>>
>> var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, 
>> rdd2).zipWithIndex.par
>>
>> thingsToDo.foreach { case(rdd, index) =>
>>   for(i <- (1 to 1))
>> logger.info(s"Index ${index} - ${rdd.sum()}")
>> }
>>
>>
>> This will run both operations in parallel.
>>
>>
>> On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:
>>
>>> For the below code, since rdd1 and rdd2 dont depend on each other - i was
>>> expecting that both first and second printlns would be interwoven.
>>> However -
>>> the spark job runs all "first " statements first and then all "seocnd"
>>> statements next in serial fashion. I have set spark.scheduler.mode =
>>> FAIR.
>>> obviously my understanding of parallel stages is wrong. What am I
>>> missing?
>>>
>>> val rdd1 = sc.parallelize(1 to 100)
>>> val rdd2 = sc.parallelize(1 to 100)
>>>
>>> for (i <- (1 to 100))
>>>   println("first: " + rdd1.sum())
>>> for (i <- (1 to 100))
>>>   println("second" + rdd2.sum())
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spar
>>> k-tp28793.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Question about Parallel Stages in Spark

2017-06-26 Thread ??????????
My words cause misunderstanding.
Step 1:A is submited to spark.
Step 2:B is submitted to spark.


Spark gets two independent jobs.The FAIR  is used to schedule A and B.


Jeffrey' code did not cause two submit.






 
---Original---
From: "Pralabh Kumar"
Date: 2017/6/27 12:09:27
To: "??"<1427357...@qq.com>;
Cc: "user";"satishl";"Bryan 
Jeffrey";
Subject: Re: Question about Parallel Stages in Spark


Hi 

I don't think so spark submit ,will receive two submits .  Its will execute one 
submit and then to next one .  If the application is multithreaded ,and two 
threads are calling spark submit and one time , then they will run parallel 
provided the scheduler is FAIR and task slots are available . 


But in one thread ,one submit will complete and then the another one will start 
. If there are independent stages in one job, then those will run parallel.


I agree with Bryan Jeffrey .




Regards
Pralabh Kumar


On Tue, Jun 27, 2017 at 9:03 AM, ?? <1427357...@qq.com> wrote:
I think the spark cluster receives two submits, A and B.
The FAIR  is used to schedule A and B.
I am not sure about this.


 
---Original---
From: "Bryan Jeffrey"
Date: 2017/6/27 08:55:42
To: "satishl";
Cc: "user";
Subject: Re: Question about Parallel Stages in Spark


Hello.

The driver is running the individual operations in series, but each operation 
is parallelized internally.  If you want them run in parallel you need to 
provide the driver a mechanism to thread the job scheduling out:


val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}


This will run both operations in parallel.




On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:
For the below code, since rdd1 and rdd2 dont depend on each other - i was
 expecting that both first and second printlns would be interwoven. However -
 the spark job runs all "first " statements first and then all "seocnd"
 statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
 obviously my understanding of parallel stages is wrong. What am I missing?
 
 val rdd1 = sc.parallelize(1 to 100)
 val rdd2 = sc.parallelize(1 to 100)
 
 for (i <- (1 to 100))
   println("first: " + rdd1.sum())
 for (i <- (1 to 100))
   println("second" + rdd2.sum())
 
 
 
 --
 View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Question about Parallel Stages in Spark

2017-06-26 Thread Pralabh Kumar
Hi

I don't think so spark submit ,will receive two submits .  Its will execute
one submit and then to next one .  If the application is multithreaded ,and
two threads are calling spark submit and one time , then they will run
parallel provided the scheduler is FAIR and task slots are available .

But in one thread ,one submit will complete and then the another one will
start . If there are independent stages in one job, then those will run
parallel.

I agree with Bryan Jeffrey .


Regards
Pralabh Kumar

On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> I think the spark cluster receives two submits, A and B.
> The FAIR  is used to schedule A and B.
> I am not sure about this.
>
> ---Original---
> *From:* "Bryan Jeffrey"
> *Date:* 2017/6/27 08:55:42
> *To:* "satishl";
> *Cc:* "user";
> *Subject:* Re: Question about Parallel Stages in Spark
>
> Hello.
>
> The driver is running the individual operations in series, but each
> operation is parallelized internally.  If you want them run in parallel you
> need to provide the driver a mechanism to thread the job scheduling out:
>
> val rdd1 = sc.parallelize(1 to 10)
> val rdd2 = sc.parallelize(1 to 20)
>
> var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par
>
> thingsToDo.foreach { case(rdd, index) =>
>   for(i <- (1 to 1))
> logger.info(s"Index ${index} - ${rdd.sum()}")
> }
>
>
> This will run both operations in parallel.
>
>
> On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:
>
>> For the below code, since rdd1 and rdd2 dont depend on each other - i was
>> expecting that both first and second printlns would be interwoven.
>> However -
>> the spark job runs all "first " statements first and then all "seocnd"
>> statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
>> obviously my understanding of parallel stages is wrong. What am I missing?
>>
>> val rdd1 = sc.parallelize(1 to 100)
>> val rdd2 = sc.parallelize(1 to 100)
>>
>> for (i <- (1 to 100))
>>   println("first: " + rdd1.sum())
>> for (i <- (1 to 100))
>>   println("second" + rdd2.sum())
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-
>> Spark-tp28793.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Question about Parallel Stages in Spark

2017-06-26 Thread ??????????
I think the spark cluster receives two submits, A and B.
The FAIR  is used to schedule A and B.
I am not sure about this.


 
---Original---
From: "Bryan Jeffrey"
Date: 2017/6/27 08:55:42
To: "satishl";
Cc: "user";
Subject: Re: Question about Parallel Stages in Spark


Hello.

The driver is running the individual operations in series, but each operation 
is parallelized internally.  If you want them run in parallel you need to 
provide the driver a mechanism to thread the job scheduling out:


val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}


This will run both operations in parallel.




On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:
For the below code, since rdd1 and rdd2 dont depend on each other - i was
 expecting that both first and second printlns would be interwoven. However -
 the spark job runs all "first " statements first and then all "seocnd"
 statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
 obviously my understanding of parallel stages is wrong. What am I missing?
 
 val rdd1 = sc.parallelize(1 to 100)
 val rdd2 = sc.parallelize(1 to 100)
 
 for (i <- (1 to 100))
   println("first: " + rdd1.sum())
 for (i <- (1 to 100))
   println("second" + rdd2.sum())
 
 
 
 --
 View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: how to mention others in JIRA comment please?

2017-06-26 Thread ??????????
thank you?9?9


 
---Original---
From: "Ted Yu"
Date: 2017/6/27 10:18:18
To: "??"<1427357...@qq.com>;
Cc: "user";"dev";
Subject: Re: how to mention others in JIRA comment please?


You can find the JIRA handle of the person you want to mention by going to a 
JIRA where that person has commented.


e.g. you want to find the handle for Joseph.
You can go to:
https://issues.apache.org/jira/browse/SPARK-6635



and click on his name in comment:
https://issues.apache.org/jira/secure/ViewProfile.jspa?name=josephkb



The following constitutes a mention for him:
[~josephkb]


FYI


On Mon, Jun 26, 2017 at 6:56 PM, ?? <1427357...@qq.com> wrote:
Hi all,


how to mention others in JIRA comment please?

I added @ before other members' name, but it didn't work.


Would you like help me please?


thanks
Fei Shao

Re: how to mention others in JIRA comment please?

2017-06-26 Thread Ted Yu
You can find the JIRA handle of the person you want to mention by going to
a JIRA where that person has commented.

e.g. you want to find the handle for Joseph.
You can go to:
https://issues.apache.org/jira/browse/SPARK-6635

and click on his name in comment:
https://issues.apache.org/jira/secure/ViewProfile.jspa?name=josephkb

The following constitutes a mention for him:
[~josephkb]

FYI

On Mon, Jun 26, 2017 at 6:56 PM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> Hi all,
>
> how to mention others in JIRA comment please?
> I added @ before other members' name, but it didn't work.
>
> Would you like help me please?
>
> thanks
> Fei Shao
>


how to mention others in JIRA comment please?

2017-06-26 Thread ??????????
Hi all,


how to mention others in JIRA comment please?

I added @ before other members' name, but it didn't work.


Would you like help me please?


thanks
Fei Shao

Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-26 Thread ??????????
Hi Kodali,


I feel puzzled about the 
"Kafka Streaming can indeed do map, reduce, join and window operations ".


Do you mean Kafka have API like map or Kafka do't have API but Kafka can do it 
please?
In my memory, kafka do not have API like map and so on.




 
---Original---
From: "kant kodali"
Date: 2017/6/12 06:41:27
To: "Mohammed Guller";
Cc: "user";"yohann 
jardin";"vaquar khan";"vincent 
gromakowski";
Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?


Also another difference I see is some thing like Spark Sql where there are 
logical plans, physical plans, Code generation and all those optimizations I 
don't see them in Kafka Streaming at this time.

On Sun, Jun 11, 2017 at 2:19 PM, kant kodali  wrote:
I appreciate the responses however I see the other side of the argument and I 
actually feel they are competitors now in Streaming space in some sense. 

Kafka Streaming can indeed do map, reduce, join and window operations and Like 
wise data can be ingested from many sources in Kafka and send the results out 
to many sinks. Look up "Kafka Connect"

Regarding Event at a time vs Micro-batch. I hear arguments from a group of 
people saying Spark Streaming is real time and other group of people is Kafka 
streaming is the true real time. so do we say Micro-batch is real time or Event 
at a time is real time?

It is well known fact that Spark is more popular with Data scientists who want 
to run ML Algorithms and so on but I also hear that people can use H2O package 
along with Kafka Streaming. so efficient each of these approaches are is 
something I have no clue.


The major difference I see is actually the Spark Scheduler I don't think Kafka 
Streaming has anything like this instead it just allows you to run lambda 
expressions on a stream and write it out to specific topic/partition and from 
there one can use Kafka Connect to write it out to any sink. so In short, All 
the optimizations built into spark scheduler don't seem to exist in Kafka 
Streaming so if I were to make a decision on which framework to use this is an 
additional question I would think about like "Do I want my stream to go through 
the scheduler and if so, why or why not"


Above all, please correct me if I am wrong :) 








On Sun, Jun 11, 2017 at 12:41 PM, Mohammed Guller  
wrote:
  

  

  
Just to elaborate more on Vincent wrote ?C Kafka streaming provides true 
record-at-a-time processing capabilities whereas Spark Streaming provides 
micro-batching capabilities on top of Spark. Depending on your use  case, you 
may find one better than the other. Both provide stateless ad stateful stream 
processing capabilities.
 
 
 
A few more things to consider:
  
If you don??t already have a Spark cluster, but have Kafka cluster, it may be 
easier to use Kafka streaming since you don??t need to setup  and manage 
another cluster. 

On the other hand, if you already have a spark cluster, but don??t have a Kafka 
cluster (in case you are using some other messaging system),  Spark streaming 
is a better option.

If you already know and use Spark, you may find it easier to program with Spark 
Streaming API even if you are using Kafka. 

Spark Streaming may give you better throughput. So you have to decide what is 
more important for your stream processing application ?C latency  or throughput?

Kafka streaming is relatively new and less mature than Spark Streaming
 
 
  
Mohammed
 
 
 

From: vincent gromakowski [mailto:vincent.gromakow...@gmail.com] 
 Sent: Sunday, June 11, 2017 12:09 PM
 To: yohann jardin 
 Cc: kant kodali ; vaquar khan ; 
user 
 Subject: Re: What is the real difference between Kafka streaming and Spark 
Streaming?
 
 
 
 

I think Kafka streams is good when the processing of each row is independant 
from each other (row parsing, data cleaning...)
 
  
Spark is better when processing group of rows (group by, ml, window func...)
 
   
 
  
Le 11 juin 2017 8:15 PM, "yohann jardin"  a ??crit :

Hey,
 
Kafka can also do streaming on its own:  
https://kafka.apache.org/documentation/streams
 I don??t know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic. 
  
Yohann Jardin
 
  
Le 6/11/2017 ?? 7:08 PM, vaquar khan a ??crit :
 
 

 Hi Kant,
 
 Kafka is the message broker that using as Producers and Consumers and Spark 
Streaming is used as the real time processing ,Kafka and Spark Streaming work 
together not competitors. 
  
Spark Streaming is reading data from Kafka and process into micro batching 

Re: issue about the windows slice of stream

2017-06-26 Thread ??????????
Hi  Owen,


Would you like help me check this issue please?
Is it a potential bug please or not?


thanks
Fei Shao




 
---Original---
From: "??"<1427357...@qq.com>
Date: 2017/6/25 21:44:41
To: "user";"dev";
Subject: Re: issue about the windows slice of stream


Hi all,


Let me add more info about this.
The log showed:
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as 
zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
the slice time is wrong.


For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { ??=== 
here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===key  log begin 
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 
ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = 
[1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 
1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) ??=== here, 
the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. 
Actual it should be 2 seconds.
===key log end
===code in ReducedWindowedDStream.scala begin
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _
// | previous window |__
// |___| current window | --> Time
// |_|
//
// | | |___ _|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - 
parent.slideDuration) ??== I think this line is 
"reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + 
windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, 
currentWindow.endTime)??== this line is 
"reducedStream.slice(previousWindow.endTime + windowDuration - 
parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===code in ReducedWindowedDStream.scala end



Thanks
Fei Shao
 
---Original---
From: "??"<1427357...@qq.com>
Date: 2017/6/24 14:51:52
To: "user";"dev";
Subject: issue about the windows slice of stream


Hi all,
I found an issue about the windows slice of dstream.
My code is :


ssc = new StreamingContext( conf, Seconds(1))


val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println())
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println()) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?


thanks
Fei Shao

Re: Question about Parallel Stages in Spark

2017-06-26 Thread Bryan Jeffrey
Hello.

The driver is running the individual operations in series, but each
operation is parallelized internally.  If you want them run in parallel you
need to provide the driver a mechanism to thread the job scheduling out:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(1 to 20)

var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par

thingsToDo.foreach { case(rdd, index) =>
  for(i <- (1 to 1))
logger.info(s"Index ${index} - ${rdd.sum()}")
}


This will run both operations in parallel.


On Mon, Jun 26, 2017 at 8:10 PM, satishl  wrote:

> For the below code, since rdd1 and rdd2 dont depend on each other - i was
> expecting that both first and second printlns would be interwoven. However
> -
> the spark job runs all "first " statements first and then all "seocnd"
> statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
> obviously my understanding of parallel stages is wrong. What am I missing?
>
> val rdd1 = sc.parallelize(1 to 100)
> val rdd2 = sc.parallelize(1 to 100)
>
> for (i <- (1 to 100))
>   println("first: " + rdd1.sum())
> for (i <- (1 to 100))
>   println("second" + rdd2.sum())
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming reduceByKeyAndWindow with inverse function seems toiterate over all the keys in the window even though they are not presentin the current batch

2017-06-26 Thread ??????????
Hi SRK,
what is the slideduration and parentduration  in your code please?
you can search "issue about the windows slice of stream" in the maillist.


Perhaps they are related.
 
---Original---
From: "SRK"
Date: 2017/6/27 03:53:22
To: "user";
Subject: Spark Streaming reduceByKeyAndWindow with inverse function seems 
toiterate over all the keys in the window  even though they are not presentin 
the current batch


Hi,

We have reduceByKeyAndWindow with inverse function feature in our Streaming
job to calculate rolling counts for the past hour and for the past 24 hours.
It seems that the functionality is iterating over all the keys in the window
even though they are not present in the current batch causing the processing
times to be high. My batch size is 1 minute. Is there a way that the
reduceByKeyAndWindow would just iterate over the keys present in the current
batch instead of reducing over all the keys in the Window? Because typically
the updates would happen only for the keys present in the current batch.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByKeyAndWindow-with-inverse-function-seems-to-iterate-over-all-the-keys-in-theh-tp28792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Question about Parallel Stages in Spark

2017-06-26 Thread satishl
For the below code, since rdd1 and rdd2 dont depend on each other - i was
expecting that both first and second printlns would be interwoven. However -
the spark job runs all "first " statements first and then all "seocnd"
statements next in serial fashion. I have set spark.scheduler.mode = FAIR. 
obviously my understanding of parallel stages is wrong. What am I missing?

val rdd1 = sc.parallelize(1 to 100)
val rdd2 = sc.parallelize(1 to 100)

for (i <- (1 to 100))
  println("first: " + rdd1.sum())
for (i <- (1 to 100))
  println("second" + rdd2.sum())



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spark-tp28793.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ZeroMQ Streaming in Spark2.x

2017-06-26 Thread Aashish Chaudhary
Thanks. I saw it earlier but did not whether this is the official way of
doing Spark with ZeroMQ. Thanks, I will have a look.
- Aashish


On Mon, Jun 26, 2017 at 3:01 PM Shixiong(Ryan) Zhu 
wrote:

> It's moved to http://bahir.apache.org/
>
> You can find document there.
>
> On Mon, Jun 26, 2017 at 11:58 AM, Aashish Chaudhary <
> aashish.chaudh...@kitware.com> wrote:
>
>> Hi there,
>>
>> I am a beginner when it comes to Spark streaming. I was looking for some
>> examples related to ZeroMQ and Spark and realized that ZeroMQUtils is no
>> longer present in Spark 2.x.
>>
>> I would appreciate if someone can shed some light on the history and what
>> I could do to use ZeroMQ with Spark Streaming in the current version.
>>
>> Thanks,
>>
>>
>


Re: Spark Streaming reduceByKeyAndWindow with inverse function seems to iterate over all the keys in the window even though they are not present in the current batch

2017-06-26 Thread Tathagata Das
Unfortunately the way reduceByKeyAndWindow is implemented, it does iterate
through all the counts. To have something more efficient, you may have to
implement your own windowing logic using mapWithState. Something like

eventDStream.flatmap { event =>
   // find the windows each even maps to, and return tuple (Window,
ValueToReduce)
}.mapWithState {
   // for each window, reduce new value to the partial reduce in state
}


Though, if you are going to implement this complex functions, I highly
recommend you to rather start using Structured Streaming, which is far more
optimized that DStreams, and gives AT LEAST 10x the throughput. It already
has the performance benefits that you are looking for.

On Mon, Jun 26, 2017 at 12:53 PM, SRK  wrote:

> Hi,
>
> We have reduceByKeyAndWindow with inverse function feature in our Streaming
> job to calculate rolling counts for the past hour and for the past 24
> hours.
> It seems that the functionality is iterating over all the keys in the
> window
> even though they are not present in the current batch causing the
> processing
> times to be high. My batch size is 1 minute. Is there a way that the
> reduceByKeyAndWindow would just iterate over the keys present in the
> current
> batch instead of reducing over all the keys in the Window? Because
> typically
> the updates would happen only for the keys present in the current batch.
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Streaming-reduceByKeyAndWindow-with-
> inverse-function-seems-to-iterate-over-all-the-keys-in-theh-tp28792.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Streaming reduceByKeyAndWindow with inverse function seems to iterate over all the keys in the window even though they are not present in the current batch

2017-06-26 Thread SRK
Hi,

We have reduceByKeyAndWindow with inverse function feature in our Streaming
job to calculate rolling counts for the past hour and for the past 24 hours.
It seems that the functionality is iterating over all the keys in the window
even though they are not present in the current batch causing the processing
times to be high. My batch size is 1 minute. Is there a way that the
reduceByKeyAndWindow would just iterate over the keys present in the current
batch instead of reducing over all the keys in the Window? Because typically
the updates would happen only for the keys present in the current batch.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByKeyAndWindow-with-inverse-function-seems-to-iterate-over-all-the-keys-in-theh-tp28792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ZeroMQ Streaming in Spark2.x

2017-06-26 Thread Shixiong(Ryan) Zhu
It's moved to http://bahir.apache.org/

You can find document there.

On Mon, Jun 26, 2017 at 11:58 AM, Aashish Chaudhary <
aashish.chaudh...@kitware.com> wrote:

> Hi there,
>
> I am a beginner when it comes to Spark streaming. I was looking for some
> examples related to ZeroMQ and Spark and realized that ZeroMQUtils is no
> longer present in Spark 2.x.
>
> I would appreciate if someone can shed some light on the history and what
> I could do to use ZeroMQ with Spark Streaming in the current version.
>
> Thanks,
>
>


ZeroMQ Streaming in Spark2.x

2017-06-26 Thread Aashish Chaudhary
Hi there,

I am a beginner when it comes to Spark streaming. I was looking for some
examples related to ZeroMQ and Spark and realized that ZeroMQUtils is no
longer present in Spark 2.x.

I would appreciate if someone can shed some light on the history and what I
could do to use ZeroMQ with Spark Streaming in the current version.

Thanks,


Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-26 Thread N B
Hi Swetha,

We have dealt with this issue a couple years ago and have solved it. The
key insight here was that adding to a HashSet and removing from a HashSet
are actually not inverse operations of each other.

For example, if you added a key K1 in batch1 and then again added that same
key K1 during a later batch lets say batch9, the inverse function needs to
be able to remove this key *twice* from the HashSet which is exactly not
what a HashSet does. Once the key is removed due to batch1 falling off, the
resulting new HashSet now has this key missing and when times comes to
remove bacth9, it will barf with the error that you are experiencing.

The solution is to actually maintain a count of how many times you have
encountered that particular key and care to decrement it in your invert
function. Once the count reaches 0, your filter function should then remove
that key from consideration.

We achieved it using a HashMap that maintains counts instead of a Set.

Hope this helps,
N B


On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy  wrote:

> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure. Any idea
> why this happens? I use scala.collection.immutable.List[String])  and my
> reduce and inverse reduce does the following.
>
> visitorSet1 ++visitorSet2
>
>
>
> visitorSet1.filterNot(visitorSet2.contains(_)
>
>
>
> On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> I changed the datastructure to scala.collection.immutable.Set and I still
>> see the same issue. My key is a String.  I do the following in my reduce
>> and invReduce.
>>
>> visitorSet1 ++visitorSet2.toTraversable
>>
>>
>> visitorSet1 --visitorSet2.toTraversable
>>
>> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes, and in general any mutable data structure. You have to immutable
>>> data structures whose hashcode and equals is consistent enough for being
>>> put in a set.
>>>
>>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" 
>>> wrote:
>>>
 Are you suggesting against the usage of HashSet?

 On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> This may be because of HashSet is a mutable data structure, and it
> seems you are actually mutating it in "set1 ++set2". I suggest creating a
> new HashMap in the function (and add both maps into it), rather than
> mutating one of them.
>
> On Tue, Jun 6, 2017 at 11:30 AM, SRK 
> wrote:
>
>> Hi,
>>
>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>> Streaming app. I use reduce, invReduce and filterFunction as shown
>> below.
>> Any idea as to why I get the error?
>>
>>  java.lang.Exception: Neither previous window has value for key, nor
>> new
>> values found. Are you sure your key class hashes consistently?
>>
>>
>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String]))
>> => (Long, HashSet[String])= {
>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>> Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>> ++set2 )
>>
>>   }
>>
>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String])) => (Long, HashSet[String])= {
>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>> Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>> set1.diff(set2))
>>   }
>>
>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>> (Boolean)= {
>> case ((metricName:String, (timeStamp: Long, set:
>> HashSet[String]))) =>
>> set.size>0
>>   }
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>> ndow-in-Spark-Streaming-tp28748.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

>>
>


Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve

2017-06-26 Thread mckunkel
First Spark project.

I have a Java method that returns a Dataset. I want to convert this to
a Dataset, where the Object is named StatusChangeDB. I have created a POJO
StatusChangeDB.java and coded it with all the query objects found in the
mySQL table.
I then create a Encoder and convert the Dataset to a
Dataset. However when I try to .show() the values of the
Dataset I receive the error

Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve '`hvpinid_quad`' given input columns: [status_change_type,
superLayer, loclayer, sector, locwire];

at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:86)

at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:83)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)

at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)

at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:289)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:287)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:287)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)

at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)

at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)

at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:287)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:287)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:287)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$10.apply(TreeNode.scala:324)

at
scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246)

at
scala.collection.MapLike$MappedValues$$anonfun$iterator$3.apply(MapLike.scala:246)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:311)

at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)

at
scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:25)

at
scala.collection.TraversableViewLike$class.force(TraversableViewLike.scala:88)

at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:311)

at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:332)

at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)

at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)

at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:287)

at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)

at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)

at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:266)

at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:276)

at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:285)

at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)

at
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:285)

at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:255)

at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:83)

at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)

at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)

at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)

at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)

at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:259)

at org.apache.spark.sql.Dataset.(Dataset.scala:209)

at org.apache.spark.sql.Dataset.(Dataset.scala:167)

at 

Re: HDP 2.5 - Python - Spark-On-Hbase

2017-06-26 Thread Weiqing Yang
For SHC documentation, please refer the README in SHC github, which is kept
up-to-date.

On Mon, Jun 26, 2017 at 5:46 AM, ayan guha  wrote:

> Thanks all, I have found correct version of the package. Probably HDP
> documentation is little behind.
>
> Best
> Ayan
>
> On Mon, 26 Jun 2017 at 2:16 pm, Mahesh Sawaiker <
> mahesh_sawai...@persistent.com> wrote:
>
>> Ayan,
>>
>> The location of the logging class was moved from Spark 1.6 to Spark 2.0.
>>
>> Looks like you are trying to run 1.6 code on 2.0, I have ported some code
>> like this before and if you have access to the code you can recompile it by
>> changing reference to Logging class and directly use the slf4 Logger class,
>> most of the code tends to be easily portable.
>>
>>
>>
>> Following is the release note for Spark 2.0
>>
>>
>>
>> *Removals, Behavior Changes and Deprecations*
>>
>> *Removals*
>>
>> The following features have been removed in Spark 2.0:
>>
>>- Bagel
>>- Support for Hadoop 2.1 and earlier
>>- The ability to configure closure serializer
>>- HTTPBroadcast
>>- TTL-based metadata cleaning
>>- *Semi-private class org.apache.spark.Logging. We suggest you use
>>slf4j directly.*
>>- SparkContext.metricsSystem
>>
>> Thanks,
>>
>> Mahesh
>>
>>
>>
>>
>>
>> *From:* ayan guha [mailto:guha.a...@gmail.com]
>> *Sent:* Monday, June 26, 2017 6:26 AM
>> *To:* Weiqing Yang
>> *Cc:* user
>> *Subject:* Re: HDP 2.5 - Python - Spark-On-Hbase
>>
>>
>>
>> Hi
>>
>>
>>
>> I am using following:
>>
>>
>>
>> --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories
>> http://repo.hortonworks.com/content/groups/public/
>>
>>
>>
>> Is it compatible with Spark 2.X? I would like to use it
>>
>>
>>
>> Best
>>
>> Ayan
>>
>>
>>
>> On Sat, Jun 24, 2017 at 2:09 AM, Weiqing Yang 
>> wrote:
>>
>> Yes.
>>
>> What SHC version you were using?
>>
>> If hitting any issues, you can post them in SHC github issues. There are
>> some threads about this.
>>
>>
>>
>> On Fri, Jun 23, 2017 at 5:46 AM, ayan guha  wrote:
>>
>> Hi
>>
>>
>>
>> Is it possible to use SHC from Hortonworks with pyspark? If so, any
>> working code sample available?
>>
>>
>>
>> Also, I faced an issue while running the samples with Spark 2.0
>>
>>
>>
>> "Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging"
>>
>>
>>
>> Any workaround?
>>
>>
>>
>> Thanks in advance
>>
>>
>>
>> --
>>
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best Regards,
>> Ayan Guha
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
> --
> Best Regards,
> Ayan Guha
>


Re: HDP 2.5 - Python - Spark-On-Hbase

2017-06-26 Thread ayan guha
Thanks all, I have found correct version of the package. Probably HDP
documentation is little behind.

Best
Ayan

On Mon, 26 Jun 2017 at 2:16 pm, Mahesh Sawaiker <
mahesh_sawai...@persistent.com> wrote:

> Ayan,
>
> The location of the logging class was moved from Spark 1.6 to Spark 2.0.
>
> Looks like you are trying to run 1.6 code on 2.0, I have ported some code
> like this before and if you have access to the code you can recompile it by
> changing reference to Logging class and directly use the slf4 Logger class,
> most of the code tends to be easily portable.
>
>
>
> Following is the release note for Spark 2.0
>
>
>
> *Removals, Behavior Changes and Deprecations*
>
> *Removals*
>
> The following features have been removed in Spark 2.0:
>
>- Bagel
>- Support for Hadoop 2.1 and earlier
>- The ability to configure closure serializer
>- HTTPBroadcast
>- TTL-based metadata cleaning
>- *Semi-private class org.apache.spark.Logging. We suggest you use
>slf4j directly.*
>- SparkContext.metricsSystem
>
> Thanks,
>
> Mahesh
>
>
>
>
>
> *From:* ayan guha [mailto:guha.a...@gmail.com]
> *Sent:* Monday, June 26, 2017 6:26 AM
> *To:* Weiqing Yang
> *Cc:* user
> *Subject:* Re: HDP 2.5 - Python - Spark-On-Hbase
>
>
>
> Hi
>
>
>
> I am using following:
>
>
>
> --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories
> http://repo.hortonworks.com/content/groups/public/
>
>
>
> Is it compatible with Spark 2.X? I would like to use it
>
>
>
> Best
>
> Ayan
>
>
>
> On Sat, Jun 24, 2017 at 2:09 AM, Weiqing Yang 
> wrote:
>
> Yes.
>
> What SHC version you were using?
>
> If hitting any issues, you can post them in SHC github issues. There are
> some threads about this.
>
>
>
> On Fri, Jun 23, 2017 at 5:46 AM, ayan guha  wrote:
>
> Hi
>
>
>
> Is it possible to use SHC from Hortonworks with pyspark? If so, any
> working code sample available?
>
>
>
> Also, I faced an issue while running the samples with Spark 2.0
>
>
>
> "Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging"
>
>
>
> Any workaround?
>
>
>
> Thanks in advance
>
>
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>
>
>
>
>
> --
>
> Best Regards,
> Ayan Guha
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>
-- 
Best Regards,
Ayan Guha


Re: Question on Spark code

2017-06-26 Thread Steve Loughran

On 25 Jun 2017, at 20:57, kant kodali 
> wrote:

impressive! I need to learn more about scala.

What I mean stripping away conditional check in Java is this.

static final boolean isLogInfoEnabled = false;

public void logMessage(String message) {
if(isLogInfoEnabled) {
log.info(message)
}
}

If you look at the byte code the dead if check will be removed.





Generally it's skipped in Java too now people move to SLF4J APIs, which does 
on-demand string expansion

LOG.info("network IO failure from {}  source to {}", src, 
dest, ex). That only builds the final string callis src.toString() and 
dest.toString() when needed; handling null values too. So you can skip those 
guards everywhere. But the string template is still constructed; it's not free, 
and there's some merit in maintaining the guard @ debug level, though I don't 
personally bother.

The spark one takes a closure, so it can do much more. However, you shouldn't 
do anything with side effects, or indeed, anything prone to throwing 
exceptions. Always try to write .toString() methods which are robust against 
null values, that is: valid for the entire life of an instance. Your debuggers 
will appreciate it too.



Fwd: Saving RDD as Kryo (broken in 2.1)

2017-06-26 Thread Alexander Krasheninnikov
Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.context.setJobDescription("Saving to path " + path)
rdd.mapPartitions(iter => iter.grouped(10)
  .map(_.toArray))
  .map(splitArray => {
//initializes kyro and calls your registrator class
var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
kryo.reset()

// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
  }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
  .flatMap(x => {

var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
kryo.reset()
val dataObject = data.asInstanceOf[Array[T]]
dataObject
  })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:75)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$

Saving RDD as Kryo (broken in 2.1)

2017-06-26 Thread Александр Крашенинников
Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.context.setJobDescription("Saving to path " + path)
rdd.mapPartitions(iter => iter.grouped(10)
  .map(_.toArray))
  .map(splitArray => {
//initializes kyro and calls your registrator class
var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
kryo.reset()

// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
  }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
  .flatMap(x => {

var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
kryo.reset()
val dataObject = data.asInstanceOf[Array[T]]
dataObject
  })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:75)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$