Re: [SPARK STREAMING] Questions regarding foreachPartition

2015-11-17 Thread Nipun Arora
Thanks Cody, that's what I thought.
Currently in the cases where I want global ordering, I am doing a collect()
call and going through everything in the client.
I wonder if there is a way to do a global ordered execution across
micro-batches in a betterway?


I am having some trouble with acquiring resources and letting them go after
the iterator in Java.
It might have to do with my resource allocator itself. I will investigate
further and get back to you.

Thanks
Nipun


On Mon, Nov 16, 2015 at 5:11 PM Cody Koeninger  wrote:

> Ordering would be on a per-partition basis, not global ordering.
>
> You typically want to acquire resources inside the foreachpartition
> closure, just before handling the iterator.
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> On Mon, Nov 16, 2015 at 4:02 PM, Nipun Arora 
> wrote:
>
>> Hi,
>> I wanted to understand forEachPartition logic. In the code below, I am
>> assuming the iterator is executing in a distributed fashion.
>>
>> 1. Assuming I have a stream which has timestamp data which is sorted.
>> Will the stringiterator in foreachPartition process each line in order?
>>
>> 2. Assuming I have a static pool of Kafka connections, where should I get
>> a connection from a pool to be used to send data to Kafka?
>>
>> addMTSUnmatched.foreachRDD(
>> new Function, Void>() {
>> @Override
>> public Void call(JavaRDD stringJavaRDD) throws Exception 
>> {
>> stringJavaRDD.foreachPartition(
>>
>> new VoidFunction>() {
>> @Override
>> public void call(Iterator 
>> stringIterator) throws Exception {
>> while(stringIterator.hasNext()){
>> String str = stringIterator.next();
>> if(OnlineUtils.ESFlag) {
>> OnlineUtils.printToFile(str, 1, 
>> type1_outputFile, OnlineUtils.client);
>> }else{
>> OnlineUtils.printToFile(str, 1, 
>> type1_outputFile);
>> }
>> }
>> }
>> }
>> );
>> return null;
>> }
>> }
>> );
>>
>>
>>
>> Thanks
>>
>> Nipun
>>
>>
>


Re: [SPARK STREAMING] Questions regarding foreachPartition

2015-11-16 Thread Cody Koeninger
Ordering would be on a per-partition basis, not global ordering.

You typically want to acquire resources inside the foreachpartition
closure, just before handling the iterator.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Mon, Nov 16, 2015 at 4:02 PM, Nipun Arora 
wrote:

> Hi,
> I wanted to understand forEachPartition logic. In the code below, I am
> assuming the iterator is executing in a distributed fashion.
>
> 1. Assuming I have a stream which has timestamp data which is sorted. Will
> the stringiterator in foreachPartition process each line in order?
>
> 2. Assuming I have a static pool of Kafka connections, where should I get
> a connection from a pool to be used to send data to Kafka?
>
> addMTSUnmatched.foreachRDD(
> new Function, Void>() {
> @Override
> public Void call(JavaRDD stringJavaRDD) throws Exception {
> stringJavaRDD.foreachPartition(
>
> new VoidFunction>() {
> @Override
> public void call(Iterator stringIterator) 
> throws Exception {
> while(stringIterator.hasNext()){
> String str = stringIterator.next();
> if(OnlineUtils.ESFlag) {
> OnlineUtils.printToFile(str, 1, 
> type1_outputFile, OnlineUtils.client);
> }else{
> OnlineUtils.printToFile(str, 1, 
> type1_outputFile);
> }
> }
> }
> }
> );
> return null;
> }
> }
> );
>
>
>
> Thanks
>
> Nipun
>
>


[SPARK STREAMING] Questions regarding foreachPartition

2015-11-16 Thread Nipun Arora
Hi,
I wanted to understand forEachPartition logic. In the code below, I am
assuming the iterator is executing in a distributed fashion.

1. Assuming I have a stream which has timestamp data which is sorted. Will
the stringiterator in foreachPartition process each line in order?

2. Assuming I have a static pool of Kafka connections, where should I get a
connection from a pool to be used to send data to Kafka?

addMTSUnmatched.foreachRDD(
new Function, Void>() {
@Override
public Void call(JavaRDD stringJavaRDD) throws Exception {
stringJavaRDD.foreachPartition(

new VoidFunction>() {
@Override
public void call(Iterator
stringIterator) throws Exception {
while(stringIterator.hasNext()){
String str = stringIterator.next();
if(OnlineUtils.ESFlag) {
OnlineUtils.printToFile(str,
1, type1_outputFile, OnlineUtils.client);
}else{
OnlineUtils.printToFile(str,
1, type1_outputFile);
}
}
}
}
);
return null;
}
}
);



Thanks

Nipun