Yes you’re right Sachin. The mapWithBcVariable is only syntactic sugar if
you have a broadcast DataSet which contains only one element. If you have
multiple elements in your DataSet then you can’t use this method. But we
can define another method mapWithBcSet which takes a function f: (element:
T, broadcastValues: List[B]) => O, for example.

If you have multiple DataSet which fulfil this condition, then you can wrap
them in a tuple as you’ve said.

Cheers,
Till
​

On Tue, Jun 2, 2015 at 7:10 PM, Sachin Goel <sachingoel0...@gmail.com>
wrote:

> Further, I think we should return just
> broadcastVariable = getRuntimeContext.
> getBroadcastVariable[B]("broadcastVariable")
> in BroadcastSingleElementMapper
> User may wish to have a list broadcasted, and not just want to access the
> first element. For example, this would make sense in the kmeans algorithm.
>
> Regards
> Sachin Goel
>
> On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <sachingoel0...@gmail.com>
> wrote:
>
> > Hi Till
> > This works only when there is only one variable to be broadcasted,
> doesn't
> > it? What about the case when we need to broadcast two? Is it advisable to
> > create a BroadcastDoubleElementMapper class or perhaps we could just
> send a
> > tuple of all the variables? Perhaps that is a better idea.
> >
> > Regards
> > Sachin Goel
> >
> > On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote:
> >
> >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
> >>
> >>
> >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
> >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
> >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
> >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
> >>
> >> Branch: refs/heads/master
> >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
> >> Parents: 44dae0c
> >> Author: Till Rohrmann <trohrm...@apache.org>
> >> Authored: Tue Jun 2 14:45:12 2015 +0200
> >> Committer: Till Rohrmann <trohrm...@apache.org>
> >> Committed: Tue Jun 2 15:34:54 2015 +0200
> >>
> >> ----------------------------------------------------------------------
> >>  .../apache/flink/ml/classification/SVM.scala    | 73
> ++++++--------------
> >>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
> >>  2 files changed, 30 insertions(+), 82 deletions(-)
> >> ----------------------------------------------------------------------
> >>
> >>
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> ----------------------------------------------------------------------
> >> diff --git
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> index e01735f..c69b56a 100644
> >> ---
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> +++
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
> >> @@ -26,6 +26,7 @@ import scala.util.Random
> >>  import org.apache.flink.api.common.functions.RichMapFunction
> >>  import org.apache.flink.api.scala._
> >>  import org.apache.flink.configuration.Configuration
> >> +import org.apache.flink.ml._
> >>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
> >>  import org.apache.flink.ml.common._
> >>  import org.apache.flink.ml.math.Vector
> >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
> >>    * of the algorithm.
> >>    */
> >>  object SVM{
> >> +
> >>    val WEIGHT_VECTOR ="weightVector"
> >>
> >>    // ========================================== Parameters
> >> =========================================
> >> @@ -242,7 +244,13 @@ object SVM{
> >>
> >>          instance.weightsOption match {
> >>            case Some(weights) => {
> >> -            input.map(new
> PredictionMapper[T]).withBroadcastSet(weights,
> >> WEIGHT_VECTOR)
> >> +            input.mapWithBcVariable(weights){
> >> +              (vector, weights) => {
> >> +                val dotProduct = weights dot vector.asBreeze
> >> +
> >> +                LabeledVector(dotProduct, vector)
> >> +              }
> >> +            }
> >>            }
> >>
> >>            case None => {
> >> @@ -254,28 +262,6 @@ object SVM{
> >>      }
> >>    }
> >>
> >> -  /** Mapper to calculate the value of the prediction function. This is
> >> a RichMapFunction, because
> >> -    * we broadcast the weight vector to all mappers.
> >> -    */
> >> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
> >> LabeledVector] {
> >> -
> >> -    var weights: BreezeDenseVector[Double] = _
> >> -
> >> -    @throws(classOf[Exception])
> >> -    override def open(configuration: Configuration): Unit = {
> >> -      // get current weights
> >> -      weights = getRuntimeContext.
> >> -
> >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> >> -    }
> >> -
> >> -    override def map(vector: T): LabeledVector = {
> >> -      // calculate the prediction value (scaled distance from the
> >> separating hyperplane)
> >> -      val dotProduct = weights dot vector.asBreeze
> >> -
> >> -      LabeledVector(dotProduct, vector)
> >> -    }
> >> -  }
> >> -
> >>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
> >> [[LabeledVector ]]types. The result type
> >>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
> >> prediction)
> >>      *
> >> @@ -291,7 +277,14 @@ object SVM{
> >>
> >>          instance.weightsOption match {
> >>            case Some(weights) => {
> >> -            input.map(new
> >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
> >> +            input.mapWithBcVariable(weights){
> >> +              (labeledVector, weights) => {
> >> +                val prediction = weights dot
> >> labeledVector.vector.asBreeze
> >> +                val truth = labeledVector.label
> >> +
> >> +                (truth, prediction)
> >> +              }
> >> +            }
> >>            }
> >>
> >>            case None => {
> >> @@ -303,30 +296,6 @@ object SVM{
> >>      }
> >>    }
> >>
> >> -  /** Mapper to calculate the value of the prediction function. This is
> >> a RichMapFunction, because
> >> -    * we broadcast the weight vector to all mappers.
> >> -    */
> >> -  class LabeledPredictionMapper extends RichMapFunction[LabeledVector,
> >> (Double, Double)] {
> >> -
> >> -    var weights: BreezeDenseVector[Double] = _
> >> -
> >> -    @throws(classOf[Exception])
> >> -    override def open(configuration: Configuration): Unit = {
> >> -      // get current weights
> >> -      weights = getRuntimeContext.
> >> -
> >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
> >> -    }
> >> -
> >> -    override def map(labeledVector: LabeledVector): (Double, Double) =
> {
> >> -      // calculate the prediction value (scaled distance from the
> >> separating hyperplane)
> >> -      val prediction = weights dot labeledVector.vector.asBreeze
> >> -      val truth = labeledVector.label
> >> -
> >> -      (truth, prediction)
> >> -    }
> >> -  }
> >> -
> >> -
> >>    /** [[FitOperation]] which trains a SVM with soft-margin based on the
> >> given training data set.
> >>      *
> >>      */
> >> @@ -540,17 +509,17 @@ object SVM{
> >>
> >>      // compute projected gradient
> >>      var proj_grad = if(alpha  <= 0.0){
> >> -      math.min(grad, 0)
> >> +      scala.math.min(grad, 0)
> >>      } else if(alpha >= 1.0) {
> >> -      math.max(grad, 0)
> >> +      scala.math.max(grad, 0)
> >>      } else {
> >>        grad
> >>      }
> >>
> >> -    if(math.abs(grad) != 0.0){
> >> +    if(scala.math.abs(grad) != 0.0){
> >>        val qii = x dot x
> >>        val newAlpha = if(qii != 0.0){
> >> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
> >> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0),
> 1.0)
> >>        } else {
> >>          1.0
> >>        }
> >>
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> ----------------------------------------------------------------------
> >> diff --git
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> index 2e3ed95..7992b02 100644
> >> ---
> >>
> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> +++
> >>
> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
> >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
> >>  import org.apache.flink.api.common.typeinfo.TypeInformation
> >>  import org.apache.flink.api.scala._
> >>  import org.apache.flink.configuration.Configuration
> >> +import org.apache.flink.ml._
> >>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
> >> ParameterMap}
> >>  import org.apache.flink.ml.math.Breeze._
> >>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
> >> @@ -209,20 +210,9 @@ object StandardScaler {
> >>
> >>          instance.metricsOption match {
> >>            case Some(metrics) => {
> >> -            input.map(new RichMapFunction[T, T]() {
> >> -
> >> -              var broadcastMean: linalg.Vector[Double] = null
> >> -              var broadcastStd: linalg.Vector[Double] = null
> >> -
> >> -              override def open(parameters: Configuration): Unit = {
> >> -                val broadcastedMetrics =
> >> getRuntimeContext().getBroadcastVariable[
> >> -                    (linalg.Vector[Double], linalg.Vector[Double])
> >> -                  ]("broadcastedMetrics").get(0)
> >> -                broadcastMean = broadcastedMetrics._1
> >> -                broadcastStd = broadcastedMetrics._2
> >> -              }
> >> -
> >> -              override def map(vector: T): T = {
> >> +            input.mapWithBcVariable(metrics){
> >> +              (vector, metrics) => {
> >> +                val (broadcastMean, broadcastStd) = metrics
> >>                  var myVector = vector.asBreeze
> >>
> >>                  myVector -= broadcastMean
> >> @@ -230,7 +220,7 @@ object StandardScaler {
> >>                  myVector = (myVector :* std) + mean
> >>                  myVector.fromBreeze
> >>                }
> >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> >> +            }
> >>            }
> >>
> >>            case None =>
> >> @@ -251,20 +241,9 @@ object StandardScaler {
> >>
> >>          instance.metricsOption match {
> >>            case Some(metrics) => {
> >> -            input.map(new RichMapFunction[LabeledVector,
> >> LabeledVector]() {
> >> -
> >> -              var broadcastMean: linalg.Vector[Double] = null
> >> -              var broadcastStd: linalg.Vector[Double] = null
> >> -
> >> -              override def open(parameters: Configuration): Unit = {
> >> -                val broadcastedMetrics =
> >> getRuntimeContext().getBroadcastVariable[
> >> -                  (linalg.Vector[Double], linalg.Vector[Double])
> >> -                  ]("broadcastedMetrics").get(0)
> >> -                broadcastMean = broadcastedMetrics._1
> >> -                broadcastStd = broadcastedMetrics._2
> >> -              }
> >> -
> >> -              override def map(labeledVector: LabeledVector):
> >> LabeledVector = {
> >> +            input.mapWithBcVariable(metrics){
> >> +              (labeledVector, metrics) => {
> >> +                val (broadcastMean, broadcastStd) = metrics
> >>                  val LabeledVector(label, vector) = labeledVector
> >>                  var breezeVector = vector.asBreeze
> >>
> >> @@ -273,7 +252,7 @@ object StandardScaler {
> >>                  breezeVector = (breezeVector :* std) + mean
> >>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
> >>                }
> >> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
> >> +            }
> >>            }
> >>
> >>            case None =>
> >>
> >>
> >
>

Reply via email to