[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412920 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)](broadcastedMetrics).get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, broadcastedMetrics) + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2featuresMeanVector,featuresStdVector --- End diff -- That's Java notation. We could write: A data set containing a single tuple of two vectors ```(meanVector,
[jira] [Commented] (FLINK-1655) Rename StreamInvokable to StreamOperator
[ https://issues.apache.org/jira/browse/FLINK-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496157#comment-14496157 ] ASF GitHub Bot commented on FLINK-1655: --- Github user akshaydixi closed the pull request at: https://github.com/apache/flink/pull/508 Rename StreamInvokable to StreamOperator Key: FLINK-1655 URL: https://issues.apache.org/jira/browse/FLINK-1655 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Akshay Dixit Priority: Minor The names of the StreamInvokable and ChainableInvokable are misleading and is easy to confuse with the AbstractInvokable These classes ( and the variable names that refer to objects) should be renamed to: StreamInvokable - StreamOperator ChainableInvokable - ChainableStreamOperator -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412522 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)](broadcastedMetrics).get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, broadcastedMetrics) + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs Cramer algorithm was used --- End diff -- Maybe we can put the link http://www.cs.yale.edu/publications/techreports/tr222.pdf here, too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the
[GitHub] flink pull request: [FLINK-1655] Renamed StreamInvokable to Stream...
Github user akshaydixi closed the pull request at: https://github.com/apache/flink/pull/508 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412479 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)](broadcastedMetrics).get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, broadcastedMetrics) + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, --- End diff -- typo: pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412374 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)](broadcastedMetrics).get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze --- End diff -- By storing the ```broadcastMeanSet``` and ```broadcastStdSet``` directly as ```BreezeVector``` we can avoid the repetitive wrapping of ```Vector``` into a ```BreezeVector``` instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412288 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null --- End diff -- Maybe ```broadcastMean``` and ```broadcastStd``` are better names, because these variables are no sets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412749 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)](broadcastedMetrics).get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, broadcastedMetrics) + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2featuresMeanVector,featuresStdVector +*/ + private def extractFeatureMetrics(dataSet: DataSet[Vector]) = { --- End diff -- By adding explicitly the return
[GitHub] flink pull request: fixed a few typos in internal_general_arch.md
GitHub user akshaydixi opened a pull request: https://github.com/apache/flink/pull/602 fixed a few typos in internal_general_arch.md You can merge this pull request into a Git repository by running: $ git pull https://github.com/akshaydixi/flink typo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/602.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #602 commit b6491c33d3d4dccd8759839f6e9479f97a74b2ce Author: Akshay Dixit akshayd...@gmail.com Date: 2015-04-15T13:26:55Z fixed a few typos in internal_general_arch.md --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: fixed a few typos in internal_general_arch.md
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/602#issuecomment-93406145 Looks good to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-992) Create CollectionDataSets by reading (client) local files.
[ https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496378#comment-14496378 ] niraj rai commented on FLINK-992: - Hi, if no one is working, I can work on this. Create CollectionDataSets by reading (client) local files. -- Key: FLINK-992 URL: https://issues.apache.org/jira/browse/FLINK-992 Project: Flink Issue Type: New Feature Components: Java API, Python API, Scala API Reporter: Fabian Hueske Assignee: Henry Saputra Priority: Minor Labels: starter {{CollectionDataSets}} are a nice way to feed data into programs. We could add support to read a client-local file at program construction time using a FileInputFormat, put its data into a CollectionDataSet, and ship its data together with the program. This would remove the need to upload small files into DFS which are used together with some large input (stored in DFS). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1869) Make StreamDiscretizers chainable
[ https://issues.apache.org/jira/browse/FLINK-1869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496299#comment-14496299 ] Akshay Dixit commented on FLINK-1869: - Hi, I'd like to take a crack at this issue if no one else is working on it. From what I've managed to understand, making StreamDiscretizer operators extend ChainableStreamOperator would enable them to be stacked on top of each other and the policy of the resulting stacked discretizer would be a union of the two policies. Is that it? Also, simply changing the StreamDiscretizer to extend ChainableStreamOperator instead of StreamOperator doesn't break any tests. I'd like some pointers on what would be needed to changed further so that it actually functions in they way it is desired. Make StreamDiscretizers chainable - Key: FLINK-1869 URL: https://issues.apache.org/jira/browse/FLINK-1869 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Currently the different StreamDiscretizer operators (StreamDiscretizer, GroupedStreamDiscretizer, GroupedActiveDiscretizer) are non-chainable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-992) Create CollectionDataSets by reading (client) local files.
[ https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496492#comment-14496492 ] Henry Saputra commented on FLINK-992: - HI [~nrai], yes you could start working on this one. Thanks! Create CollectionDataSets by reading (client) local files. -- Key: FLINK-992 URL: https://issues.apache.org/jira/browse/FLINK-992 Project: Flink Issue Type: New Feature Components: Java API, Python API, Scala API Reporter: Fabian Hueske Assignee: Henry Saputra Priority: Minor Labels: starter {{CollectionDataSets}} are a nice way to feed data into programs. We could add support to read a client-local file at program construction time using a FileInputFormat, put its data into a CollectionDataSet, and ship its data together with the program. This would remove the need to upload small files into DFS which are used together with some large input (stored in DFS). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] niraj rai reassigned FLINK-1818: Assignee: niraj rai Provide API to cancel running job - Key: FLINK-1818 URL: https://issues.apache.org/jira/browse/FLINK-1818 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.9 Reporter: Robert Metzger Assignee: niraj rai Labels: starter http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-992) Create CollectionDataSets by reading (client) local files.
[ https://issues.apache.org/jira/browse/FLINK-992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] niraj rai reassigned FLINK-992: --- Assignee: niraj rai (was: Henry Saputra) Create CollectionDataSets by reading (client) local files. -- Key: FLINK-992 URL: https://issues.apache.org/jira/browse/FLINK-992 Project: Flink Issue Type: New Feature Components: Java API, Python API, Scala API Reporter: Fabian Hueske Assignee: niraj rai Priority: Minor Labels: starter {{CollectionDataSets}} are a nice way to feed data into programs. We could add support to read a client-local file at program construction time using a FileInputFormat, put its data into a CollectionDataSet, and ship its data together with the program. This would remove the need to upload small files into DFS which are used together with some large input (stored in DFS). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1753] Extend KafkaITCase, reworked Pers...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/603 [FLINK-1753] Extend KafkaITCase, reworked PersistentKafkaSource (initially) Added large messages test and reworked PersistentKafkaSource. A Flink user I'm talking with offline is affected by this issue, that's why I would like to merge this rather soon. (I am aware that one test case is failing, so I'll spend more time on this tomorrow morning) You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink kafka_big_records-rebased Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #603 commit e8bb15a686f17710c709662b4eaf475b8e3de528 Author: Robert Metzger rmetz...@apache.org Date: 2015-04-13T14:23:58Z [FLINK-1753] Extend KafkaITCase with large messages test + rework PersistentKafkaSource. commit c3be60f1db82ecee7b0a9cca0f1f637bfd722739 Author: Robert Metzger rmetz...@apache.org Date: 2015-04-15T20:20:03Z wip --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors
[ https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496862#comment-14496862 ] ASF GitHub Bot commented on FLINK-1753: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/603 [FLINK-1753] Extend KafkaITCase, reworked PersistentKafkaSource (initially) Added large messages test and reworked PersistentKafkaSource. A Flink user I'm talking with offline is affected by this issue, that's why I would like to merge this rather soon. (I am aware that one test case is failing, so I'll spend more time on this tomorrow morning) You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink kafka_big_records-rebased Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #603 commit e8bb15a686f17710c709662b4eaf475b8e3de528 Author: Robert Metzger rmetz...@apache.org Date: 2015-04-13T14:23:58Z [FLINK-1753] Extend KafkaITCase with large messages test + rework PersistentKafkaSource. commit c3be60f1db82ecee7b0a9cca0f1f637bfd722739 Author: Robert Metzger rmetz...@apache.org Date: 2015-04-15T20:20:03Z wip Add more tests for Kafka Connectors --- Key: FLINK-1753 URL: https://issues.apache.org/jira/browse/FLINK-1753 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Gábor Hermann The current {{KafkaITCase}} is only doing a single test. We need to refactor that test so that it brings up a Kafka/Zookeeper server and than performs various tests: Tests to include: - A topology with non-string types MERGED IN 359b39c3 - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3 - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3 - Kafka broker failure MERGED IN cb34e976 - Flink TaskManager failure - Test with large records (up to 30 MB). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stream graph + internal refactor
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/594#issuecomment-93635634 This is great patch, but as @mxm mentioned before, it almost impossible to review patches with this size. If the PR could not be chopped into separate PRs, lets try to have smaller commits to help. Before merge we could squash it to make it just one commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Extend KafkaITCase, reworked Pers...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/603#issuecomment-93568853 The failed test has been resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors
[ https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14496993#comment-14496993 ] ASF GitHub Bot commented on FLINK-1753: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/603#issuecomment-93568853 The failed test has been resolved. Add more tests for Kafka Connectors --- Key: FLINK-1753 URL: https://issues.apache.org/jira/browse/FLINK-1753 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Gábor Hermann The current {{KafkaITCase}} is only doing a single test. We need to refactor that test so that it brings up a Kafka/Zookeeper server and than performs various tests: Tests to include: - A topology with non-string types MERGED IN 359b39c3 - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3 - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3 - Kafka broker failure MERGED IN cb34e976 - Flink TaskManager failure - Test with large records (up to 30 MB). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Stream graph + internal refactor
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/594#issuecomment-93264219 I will merge this later today if no objections . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stream graph + internal refactor
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/594 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-1891) Add isEmpty check when the input dir
Sibao Hong created FLINK-1891: - Summary: Add isEmpty check when the input dir Key: FLINK-1891 URL: https://issues.apache.org/jira/browse/FLINK-1891 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Add the input storageDirectory empty check, if input of storageDirectory is empty, we should use tmp as the base dir -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1876) Refactor StreamGraph to be node centric
[ https://issues.apache.org/jira/browse/FLINK-1876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora resolved FLINK-1876. --- Resolution: Done Refactor StreamGraph to be node centric --- Key: FLINK-1876 URL: https://issues.apache.org/jira/browse/FLINK-1876 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Currently the StreamGraph that keeps all the information about the streaming topolgy uses a large number of hashmaps to store graph properties. These should be refactored in a node centric fashion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1891]Add the input storageDirectory emp...
GitHub user matadorhong opened a pull request: https://github.com/apache/flink/pull/601 [FLINK-1891]Add the input storageDirectory empty check Add the input storageDirectory empty check, if input of storageDirectory is empty, we should use tmp as the base dir And also fix the code style proble of blank space between every ) and { You can merge this pull request into a Git repository by running: $ git pull https://github.com/matadorhong/flink Fix_bug_dir_empty_check Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #601 commit 90c5e380e8f2e5970a2aebc57d098f56b4f65135 Author: hongsibao hongsi...@huawei.com Date: 2015-04-15T10:15:27Z Add the input storageDirectory empty check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1891) Add isEmpty check when the input dir
[ https://issues.apache.org/jira/browse/FLINK-1891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495989#comment-14495989 ] ASF GitHub Bot commented on FLINK-1891: --- GitHub user matadorhong opened a pull request: https://github.com/apache/flink/pull/601 [FLINK-1891]Add the input storageDirectory empty check Add the input storageDirectory empty check, if input of storageDirectory is empty, we should use tmp as the base dir And also fix the code style proble of blank space between every ) and { You can merge this pull request into a Git repository by running: $ git pull https://github.com/matadorhong/flink Fix_bug_dir_empty_check Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #601 commit 90c5e380e8f2e5970a2aebc57d098f56b4f65135 Author: hongsibao hongsi...@huawei.com Date: 2015-04-15T10:15:27Z Add the input storageDirectory empty check Add isEmpty check when the input dir Key: FLINK-1891 URL: https://issues.apache.org/jira/browse/FLINK-1891 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Add the input storageDirectory empty check, if input of storageDirectory is empty, we should use tmp as the base dir -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412028 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one --- End diff -- Mean and standard deviation are configurable, right? Default is (0,1)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---