Re: Keep state inside map function

2014-10-28 Thread Koert Kuipers
doing cleanup in an iterator like that assumes the iterator always gets
fully read, which is not necessary the case (for example RDD.take does not).

instead i would use mapPartitionsWithContext, in which case you can write a
function of the form.
 f: (TaskContext, Iterator[T]) = Iterator[U]

now you can register a cleanup with the task context, like this:
context.addTaskCompletionListener(new TaskCompletionListener {
  override def onTaskCompletion(context: TaskContext): Unit = dosomething
)

and after that proceed with an iterator transformation as usual


On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer t...@preferred.jp wrote:
  rdd.mapPartitions { partition =
 // Some setup code here
 val result = partition.map(yourfunction)
 
 // Some cleanup code here
 result
  }

 Yes, I realized that after I hit send. You definitely have to store
 and return the result from the mapping!


  rdd.mapPartitions { partition =
 if (!partition.isEmpty) {
 
   // Some setup code here
   partition.map(item = {
 val output = yourfunction(item)
 if (!partition.hasNext) {
   // Some cleanup code here
 }
 output
   })
 } else {
   // return an empty Iterator of your return type
 }
  }

 Great point, yeah. If you knew the number of values were small you
 could collect them and process locally, but this is the right general
 way to do it.



Re: Keep state inside map function

2014-07-30 Thread Kevin
Thanks to the both of you for your inputs. Looks like I'll play with the
mapPartitions function to start porting MapReduce algorithms to Spark.


On Wed, Jul 30, 2014 at 1:23 PM, Sean Owen so...@cloudera.com wrote:

 Really, the analog of a Mapper is not map(), but mapPartitions(). Instead
 of:

 rdd.map(yourFunction)

 ... you can run setup code before mapping a bunch of records, and
 after, like so:

 rdd.mapPartitions { partition =
// Some setup code here
partition.map(yourfunction)
// Some cleanup code here
 }

 You couldn't share state across Mappers, or Mappers and Reducers in
 Hadoop. (At least there was no direct way.) Same here. But you can
 maintain state across many map calls.

 On Wed, Jul 30, 2014 at 6:07 PM, Kevin kevin.macksa...@gmail.com wrote:
  Hi,
 
  Is it possible to maintain state inside a Spark map function? With Hadoop
  MapReduce, Mappers and Reducers are classes that can have their own state
  using instance variables. Can this be done with Spark? Are there any
  examples?
 
  Most examples I have seen do a simple operating on the value passed into
 the
  map function and then pass it along to the reduce function.
 
  Thanks in advance.
 
  -Kevin