Re: [SPARK STREAMING] Questions regarding foreachPartition
Thanks Cody, that's what I thought. Currently in the cases where I want global ordering, I am doing a collect() call and going through everything in the client. I wonder if there is a way to do a global ordered execution across micro-batches in a betterway? I am having some trouble with acquiring resources and letting them go after the iterator in Java. It might have to do with my resource allocator itself. I will investigate further and get back to you. Thanks Nipun On Mon, Nov 16, 2015 at 5:11 PM Cody Koeninger wrote: > Ordering would be on a per-partition basis, not global ordering. > > You typically want to acquire resources inside the foreachpartition > closure, just before handling the iterator. > > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > On Mon, Nov 16, 2015 at 4:02 PM, Nipun Arora > wrote: > >> Hi, >> I wanted to understand forEachPartition logic. In the code below, I am >> assuming the iterator is executing in a distributed fashion. >> >> 1. Assuming I have a stream which has timestamp data which is sorted. >> Will the stringiterator in foreachPartition process each line in order? >> >> 2. Assuming I have a static pool of Kafka connections, where should I get >> a connection from a pool to be used to send data to Kafka? >> >> addMTSUnmatched.foreachRDD( >> new Function, Void>() { >> @Override >> public Void call(JavaRDD stringJavaRDD) throws Exception >> { >> stringJavaRDD.foreachPartition( >> >> new VoidFunction>() { >> @Override >> public void call(Iterator >> stringIterator) throws Exception { >> while(stringIterator.hasNext()){ >> String str = stringIterator.next(); >> if(OnlineUtils.ESFlag) { >> OnlineUtils.printToFile(str, 1, >> type1_outputFile, OnlineUtils.client); >> }else{ >> OnlineUtils.printToFile(str, 1, >> type1_outputFile); >> } >> } >> } >> } >> ); >> return null; >> } >> } >> ); >> >> >> >> Thanks >> >> Nipun >> >> >
Re: [SPARK STREAMING] Questions regarding foreachPartition
Ordering would be on a per-partition basis, not global ordering. You typically want to acquire resources inside the foreachpartition closure, just before handling the iterator. http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On Mon, Nov 16, 2015 at 4:02 PM, Nipun Arora wrote: > Hi, > I wanted to understand forEachPartition logic. In the code below, I am > assuming the iterator is executing in a distributed fashion. > > 1. Assuming I have a stream which has timestamp data which is sorted. Will > the stringiterator in foreachPartition process each line in order? > > 2. Assuming I have a static pool of Kafka connections, where should I get > a connection from a pool to be used to send data to Kafka? > > addMTSUnmatched.foreachRDD( > new Function, Void>() { > @Override > public Void call(JavaRDD stringJavaRDD) throws Exception { > stringJavaRDD.foreachPartition( > > new VoidFunction>() { > @Override > public void call(Iterator stringIterator) > throws Exception { > while(stringIterator.hasNext()){ > String str = stringIterator.next(); > if(OnlineUtils.ESFlag) { > OnlineUtils.printToFile(str, 1, > type1_outputFile, OnlineUtils.client); > }else{ > OnlineUtils.printToFile(str, 1, > type1_outputFile); > } > } > } > } > ); > return null; > } > } > ); > > > > Thanks > > Nipun > >
[SPARK STREAMING] Questions regarding foreachPartition
Hi, I wanted to understand forEachPartition logic. In the code below, I am assuming the iterator is executing in a distributed fashion. 1. Assuming I have a stream which has timestamp data which is sorted. Will the stringiterator in foreachPartition process each line in order? 2. Assuming I have a static pool of Kafka connections, where should I get a connection from a pool to be used to send data to Kafka? addMTSUnmatched.foreachRDD( new Function, Void>() { @Override public Void call(JavaRDD stringJavaRDD) throws Exception { stringJavaRDD.foreachPartition( new VoidFunction>() { @Override public void call(Iterator stringIterator) throws Exception { while(stringIterator.hasNext()){ String str = stringIterator.next(); if(OnlineUtils.ESFlag) { OnlineUtils.printToFile(str, 1, type1_outputFile, OnlineUtils.client); }else{ OnlineUtils.printToFile(str, 1, type1_outputFile); } } } } ); return null; } } ); Thanks Nipun