[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412920
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)](broadcastedMetrics).get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, broadcastedMetrics)
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs  Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2featuresMeanVector,featuresStdVector
--- End diff --

That's Java notation. We could write: A data set containing a single tuple 
of two vectors ```(meanVector, 

[jira] [Commented] (FLINK-1655) Rename StreamInvokable to StreamOperator

2015-04-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496157#comment-14496157
 ] 

ASF GitHub Bot commented on FLINK-1655:
---

Github user akshaydixi closed the pull request at:

https://github.com/apache/flink/pull/508


 Rename StreamInvokable to StreamOperator
 

 Key: FLINK-1655
 URL: https://issues.apache.org/jira/browse/FLINK-1655
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora
Assignee: Akshay Dixit
Priority: Minor

 The names of the StreamInvokable and ChainableInvokable are misleading and is 
 easy to confuse with the AbstractInvokable
 These classes ( and the variable names that refer to objects) should be 
 renamed to:
 StreamInvokable - StreamOperator
 ChainableInvokable - ChainableStreamOperator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412522
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)](broadcastedMetrics).get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, broadcastedMetrics)
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs  Cramer algorithm was used
--- End diff --

Maybe we can put the link 
http://www.cs.yale.edu/publications/techreports/tr222.pdf here, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the 

[GitHub] flink pull request: [FLINK-1655] Renamed StreamInvokable to Stream...

2015-04-15 Thread akshaydixi
Github user akshaydixi closed the pull request at:

https://github.com/apache/flink/pull/508


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412479
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)](broadcastedMetrics).get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, broadcastedMetrics)
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
--- End diff --

typo: pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412374
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)](broadcastedMetrics).get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
--- End diff --

By storing the ```broadcastMeanSet``` and ```broadcastStdSet``` directly as 
```BreezeVector``` we can avoid the repetitive wrapping of ```Vector``` into a 
```BreezeVector``` instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412288
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
--- End diff --

Maybe ```broadcastMean``` and ```broadcastStd``` are better names, because 
these variables are no sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412749
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)](broadcastedMetrics).get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, broadcastedMetrics)
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs  Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2featuresMeanVector,featuresStdVector
+*/
+  private def extractFeatureMetrics(dataSet: DataSet[Vector]) = {
--- End diff --

By adding explicitly the return 

[GitHub] flink pull request: fixed a few typos in internal_general_arch.md

2015-04-15 Thread akshaydixi
GitHub user akshaydixi opened a pull request:

https://github.com/apache/flink/pull/602

fixed a few typos in internal_general_arch.md



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akshaydixi/flink typo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/602.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #602


commit b6491c33d3d4dccd8759839f6e9479f97a74b2ce
Author: Akshay Dixit akshayd...@gmail.com
Date:   2015-04-15T13:26:55Z

fixed a few typos in internal_general_arch.md




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: fixed a few typos in internal_general_arch.md

2015-04-15 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/602#issuecomment-93406145
  
Looks good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-992) Create CollectionDataSets by reading (client) local files.

2015-04-15 Thread niraj rai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496378#comment-14496378
 ] 

niraj rai commented on FLINK-992:
-

Hi, if no one is working, I can work on this.

 Create CollectionDataSets by reading (client) local files.
 --

 Key: FLINK-992
 URL: https://issues.apache.org/jira/browse/FLINK-992
 Project: Flink
  Issue Type: New Feature
  Components: Java API, Python API, Scala API
Reporter: Fabian Hueske
Assignee: Henry Saputra
Priority: Minor
  Labels: starter

 {{CollectionDataSets}} are a nice way to feed data into programs.
 We could add support to read a client-local file at program construction time 
 using a FileInputFormat, put its data into a CollectionDataSet, and ship its 
 data together with the program.
 This would remove the need to upload small files into DFS which are used 
 together with some large input (stored in DFS).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1869) Make StreamDiscretizers chainable

2015-04-15 Thread Akshay Dixit (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496299#comment-14496299
 ] 

Akshay Dixit commented on FLINK-1869:
-

Hi, I'd like to take a crack at this issue if no one else is working on it.

From what I've managed to understand, making StreamDiscretizer operators 
extend ChainableStreamOperator would enable them to be stacked on top of each 
other and the policy of the resulting stacked discretizer would be a union of 
the two policies. Is that it? 

Also, simply changing the StreamDiscretizer to extend ChainableStreamOperator  
instead of StreamOperator doesn't break any tests. I'd like some pointers on 
what would be needed to changed further so that it actually functions in they 
way it is desired.

 Make StreamDiscretizers chainable
 -

 Key: FLINK-1869
 URL: https://issues.apache.org/jira/browse/FLINK-1869
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora

 Currently the different StreamDiscretizer operators (StreamDiscretizer, 
 GroupedStreamDiscretizer, GroupedActiveDiscretizer) are non-chainable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-992) Create CollectionDataSets by reading (client) local files.

2015-04-15 Thread Henry Saputra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496492#comment-14496492
 ] 

Henry Saputra commented on FLINK-992:
-

HI [~nrai], yes you could start working on this one. Thanks!

 Create CollectionDataSets by reading (client) local files.
 --

 Key: FLINK-992
 URL: https://issues.apache.org/jira/browse/FLINK-992
 Project: Flink
  Issue Type: New Feature
  Components: Java API, Python API, Scala API
Reporter: Fabian Hueske
Assignee: Henry Saputra
Priority: Minor
  Labels: starter

 {{CollectionDataSets}} are a nice way to feed data into programs.
 We could add support to read a client-local file at program construction time 
 using a FileInputFormat, put its data into a CollectionDataSet, and ship its 
 data together with the program.
 This would remove the need to upload small files into DFS which are used 
 together with some large input (stored in DFS).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-1818) Provide API to cancel running job

2015-04-15 Thread niraj rai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

niraj rai reassigned FLINK-1818:


Assignee: niraj rai

 Provide API to cancel running job
 -

 Key: FLINK-1818
 URL: https://issues.apache.org/jira/browse/FLINK-1818
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: niraj rai
  Labels: starter

 http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-992) Create CollectionDataSets by reading (client) local files.

2015-04-15 Thread niraj rai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

niraj rai reassigned FLINK-992:
---

Assignee: niraj rai  (was: Henry Saputra)

 Create CollectionDataSets by reading (client) local files.
 --

 Key: FLINK-992
 URL: https://issues.apache.org/jira/browse/FLINK-992
 Project: Flink
  Issue Type: New Feature
  Components: Java API, Python API, Scala API
Reporter: Fabian Hueske
Assignee: niraj rai
Priority: Minor
  Labels: starter

 {{CollectionDataSets}} are a nice way to feed data into programs.
 We could add support to read a client-local file at program construction time 
 using a FileInputFormat, put its data into a CollectionDataSet, and ship its 
 data together with the program.
 This would remove the need to upload small files into DFS which are used 
 together with some large input (stored in DFS).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1753] Extend KafkaITCase, reworked Pers...

2015-04-15 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/603

[FLINK-1753] Extend KafkaITCase, reworked PersistentKafkaSource (initially)

Added large messages test and reworked PersistentKafkaSource.

A Flink user I'm talking with offline is affected by this issue, that's why 
I would like to merge this rather soon. (I am aware that one test case is 
failing, so I'll spend more time on this tomorrow morning)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink kafka_big_records-rebased

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/603.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #603


commit e8bb15a686f17710c709662b4eaf475b8e3de528
Author: Robert Metzger rmetz...@apache.org
Date:   2015-04-13T14:23:58Z

[FLINK-1753] Extend KafkaITCase with large messages test + rework 
PersistentKafkaSource.

commit c3be60f1db82ecee7b0a9cca0f1f637bfd722739
Author: Robert Metzger rmetz...@apache.org
Date:   2015-04-15T20:20:03Z

wip




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors

2015-04-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496862#comment-14496862
 ] 

ASF GitHub Bot commented on FLINK-1753:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/603

[FLINK-1753] Extend KafkaITCase, reworked PersistentKafkaSource (initially)

Added large messages test and reworked PersistentKafkaSource.

A Flink user I'm talking with offline is affected by this issue, that's why 
I would like to merge this rather soon. (I am aware that one test case is 
failing, so I'll spend more time on this tomorrow morning)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink kafka_big_records-rebased

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/603.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #603


commit e8bb15a686f17710c709662b4eaf475b8e3de528
Author: Robert Metzger rmetz...@apache.org
Date:   2015-04-13T14:23:58Z

[FLINK-1753] Extend KafkaITCase with large messages test + rework 
PersistentKafkaSource.

commit c3be60f1db82ecee7b0a9cca0f1f637bfd722739
Author: Robert Metzger rmetz...@apache.org
Date:   2015-04-15T20:20:03Z

wip




 Add more tests for Kafka Connectors
 ---

 Key: FLINK-1753
 URL: https://issues.apache.org/jira/browse/FLINK-1753
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Gábor Hermann

 The current {{KafkaITCase}} is only doing a single test.
 We need to refactor that test so that it brings up a Kafka/Zookeeper server 
 and than performs various tests:
 Tests to include:
 - A topology with non-string types MERGED IN 359b39c3
 - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3
 - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3
 - Kafka broker failure MERGED IN cb34e976
 - Flink TaskManager failure
 - Test with large records (up to 30 MB).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stream graph + internal refactor

2015-04-15 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/594#issuecomment-93635634
  
This is great patch, but as @mxm mentioned before, it almost impossible to 
review patches with this size.

If the PR could not be chopped into separate PRs, lets try to have smaller 
commits to help. Before merge we could squash it to make it just one commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Extend KafkaITCase, reworked Pers...

2015-04-15 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/603#issuecomment-93568853
  
The failed test has been resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors

2015-04-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496993#comment-14496993
 ] 

ASF GitHub Bot commented on FLINK-1753:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/603#issuecomment-93568853
  
The failed test has been resolved.


 Add more tests for Kafka Connectors
 ---

 Key: FLINK-1753
 URL: https://issues.apache.org/jira/browse/FLINK-1753
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Gábor Hermann

 The current {{KafkaITCase}} is only doing a single test.
 We need to refactor that test so that it brings up a Kafka/Zookeeper server 
 and than performs various tests:
 Tests to include:
 - A topology with non-string types MERGED IN 359b39c3
 - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3
 - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3
 - Kafka broker failure MERGED IN cb34e976
 - Flink TaskManager failure
 - Test with large records (up to 30 MB).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stream graph + internal refactor

2015-04-15 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/594#issuecomment-93264219
  
I will merge this later today if no objections .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stream graph + internal refactor

2015-04-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/594


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-1891) Add isEmpty check when the input dir

2015-04-15 Thread Sibao Hong (JIRA)
Sibao Hong created FLINK-1891:
-

 Summary: Add isEmpty check when the input dir
 Key: FLINK-1891
 URL: https://issues.apache.org/jira/browse/FLINK-1891
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: master
Reporter: Sibao Hong
Assignee: Sibao Hong


Add the input storageDirectory empty check, if input of storageDirectory is 
empty, we should use tmp as the base dir



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-1876) Refactor StreamGraph to be node centric

2015-04-15 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora resolved FLINK-1876.
---
Resolution: Done

 Refactor StreamGraph to be node centric
 ---

 Key: FLINK-1876
 URL: https://issues.apache.org/jira/browse/FLINK-1876
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora

 Currently the StreamGraph that keeps all the information about the streaming 
 topolgy uses a large number of hashmaps to store graph properties. 
 These should be refactored in a node centric fashion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1891]Add the input storageDirectory emp...

2015-04-15 Thread matadorhong
GitHub user matadorhong opened a pull request:

https://github.com/apache/flink/pull/601

[FLINK-1891]Add the input storageDirectory empty check

Add the input storageDirectory empty check, if input of storageDirectory is 
empty, we should use tmp as the base dir
And also fix the code style proble of blank space between every ) and {

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/matadorhong/flink Fix_bug_dir_empty_check

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/601.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #601


commit 90c5e380e8f2e5970a2aebc57d098f56b4f65135
Author: hongsibao hongsi...@huawei.com
Date:   2015-04-15T10:15:27Z

Add the input storageDirectory empty check




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1891) Add isEmpty check when the input dir

2015-04-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495989#comment-14495989
 ] 

ASF GitHub Bot commented on FLINK-1891:
---

GitHub user matadorhong opened a pull request:

https://github.com/apache/flink/pull/601

[FLINK-1891]Add the input storageDirectory empty check

Add the input storageDirectory empty check, if input of storageDirectory is 
empty, we should use tmp as the base dir
And also fix the code style proble of blank space between every ) and {

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/matadorhong/flink Fix_bug_dir_empty_check

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/601.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #601


commit 90c5e380e8f2e5970a2aebc57d098f56b4f65135
Author: hongsibao hongsi...@huawei.com
Date:   2015-04-15T10:15:27Z

Add the input storageDirectory empty check




 Add isEmpty check when the input dir
 

 Key: FLINK-1891
 URL: https://issues.apache.org/jira/browse/FLINK-1891
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: master
Reporter: Sibao Hong
Assignee: Sibao Hong

 Add the input storageDirectory empty check, if input of storageDirectory is 
 empty, we should use tmp as the base dir



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412028
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
--- End diff --

Mean and standard deviation are configurable, right? Default is (0,1)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---