[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/2459 Hi @shijinkui, thanks for opening pull request. Unfortunately, this pull request cause a problem with maven shading plugin (https://issues.apache.org/jira/browse/MSHADE-200). Due to the problem, Flink community decided to use version string instead of property and add to a shell script (tools/change-scala-version.sh) to convert Scala version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76580310 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +val data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.data.fullOuterJoin(other.data).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left) match { + case Some((id, block)) => (id, block) + case None => +(right._1, Block.zero(right._2.getRows, right._2.getCols)) --- End diff -- Ah I see. `Block.zero` has only few costs. Sorry for inconvenience. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76552087 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, values: Vector) extends Ordered[ override def toString: String = s"($rowIndex, ${values.toString})" } + +/** + * Serializable Reduction function used by the toBlockMatrix function. Takes an ordered list of + * indexed row and split those rows to form blocks. + */ +class RowGroupReducer(blockMapper: BlockMapper) +extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] { + + override def reduce(values: java.lang.Iterable[(Int, Int, Vector)], + out: Collector[(Int, Block)]): Unit = { + +val sortedRows = values.toList.sortBy(_._2) +val blockID = sortedRows.head._1 +val coo = for { + (_, rowIndex, vec) <- sortedRows + (colIndex, value) <- vec if value != 0 +} yield (rowIndex, colIndex, value) + +val block: Block = Block( +SparseMatrix.fromCOO( +blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo)) +out.collect((blockID, block)) + } +} + +class RowSplitter(blockMapper: BlockMapper) +extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] { + override def flatMap( + row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = { +val IndexedRow(rowIndex, vector) = row +val splitRow = sliceVector(vector) +for ((mappedCol, slice) <- splitRow) { + val mappedRow = +math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt + val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, mappedCol) + out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice)) +} + } + + def sliceVector(v: Vector): List[(Int, Vector)] = { + +def getSliceByColIndex(index: Int): Int = --- End diff -- This method is used once. Please remove this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551557 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +val data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.data.fullOuterJoin(other.data).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left) match { + case Some((id, block)) => (id, block) + case None => +(right._1, Block.zero(right._2.getRows, right._2.getCols)) --- End diff -- Zero-filled matrix can be re-used. Please create the matrix once using `RichFlatJoinFunction` class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551568 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +val data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.data.fullOuterJoin(other.data).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left) match { + case Some((id, block)) => (id, block) + case None => +(right._1, Block.zero(right._2.getRows, right._2.getCols)) +} + +val (id2, block2) = Option(right) match { + case Some((id, block)) => (id, block) + case None => +(left._1, Block.zero(left._2.getRows, left._2.getCols)) --- End diff -- Re-use zero-filled matrix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551191 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +val data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks --- End diff -- Please avoid naming convention starts with `get/set` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551144 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +val data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) --- End diff -- Could you add a message for understanding what is the problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551077 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, MatrixRowIndex} + +/** + * This class is in charge of handling all the spatial logic required by BlockMatrix. + * It introduces a new space of zero-indexed coordinates (i,j), called "mapped coordinates". + * This is a space where blocks are indexed (starting from zero). + * + * So every coordinate in the original space can be mapped to this space and to the + * corrisponding block. A block have a row and a column coordinate that explicits + * its position. Every set of coordinates in the mapped space corresponds to a square + * of size rowPerBlock x colsPerBlock. + * + */ +case class BlockMapper( //original matrix size + numRows: Int, + numCols: Int, + //block size + rowsPerBlock: Int, + colsPerBlock: Int) { + + require(numRows >= rowsPerBlock && numCols >= colsPerBlock) + val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt + val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt + val numBlocks = numBlockCols * numBlockRows + + /** +* Translates absolute coordinates to the mapped coordinates of the block +* these coordinates belong to. +* @param i +* @param j +* @return +*/ + def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, Int) = +getBlockMappedCoordinates(getBlockIdByCoordinates(i, j)) + + /** +* Retrieves a block id from original coordinates +* @param i Original row +* @param j Original column +* @return Block ID +*/ + def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = { + +if (i < 0 || j < 0 || i >= numRows || j >= numCols) { + throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).") +} else { + val mappedRow = i / rowsPerBlock + val mappedColumn = j / colsPerBlock + val res = mappedRow * numBlockCols + mappedColumn + + assert(res <= numBlocks) + res +} + } + + /** +* Retrieves mapped coordinates for a given block. +* @param blockId +* @return +*/ + def getBlockMappedCoordinates(blockId: Int): (Int, Int) = { +if (blockId < 0 || blockId > numBlockCols * numBlockRows) { + throw new IllegalArgumentException( + s"BlockId numeration starts from 0. $blockId is not a valid Id" + ) --- End diff -- Please use `require` function like above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551066 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, MatrixRowIndex} + +/** + * This class is in charge of handling all the spatial logic required by BlockMatrix. + * It introduces a new space of zero-indexed coordinates (i,j), called "mapped coordinates". + * This is a space where blocks are indexed (starting from zero). + * + * So every coordinate in the original space can be mapped to this space and to the + * corrisponding block. A block have a row and a column coordinate that explicits + * its position. Every set of coordinates in the mapped space corresponds to a square + * of size rowPerBlock x colsPerBlock. + * + */ +case class BlockMapper( //original matrix size + numRows: Int, + numCols: Int, + //block size + rowsPerBlock: Int, + colsPerBlock: Int) { + + require(numRows >= rowsPerBlock && numCols >= colsPerBlock) + val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt + val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt + val numBlocks = numBlockCols * numBlockRows + + /** +* Translates absolute coordinates to the mapped coordinates of the block +* these coordinates belong to. +* @param i +* @param j +* @return +*/ + def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, Int) = +getBlockMappedCoordinates(getBlockIdByCoordinates(i, j)) + + /** +* Retrieves a block id from original coordinates +* @param i Original row +* @param j Original column +* @return Block ID +*/ + def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = { + +if (i < 0 || j < 0 || i >= numRows || j >= numCols) { + throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).") --- End diff -- Following block with `require` would be better: ```scala require(0 <= i && i < numRows && 0 <= j && j < numCols, s"Invalid coordinates ($i, $j).") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76551085 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, MatrixRowIndex} + +/** + * This class is in charge of handling all the spatial logic required by BlockMatrix. + * It introduces a new space of zero-indexed coordinates (i,j), called "mapped coordinates". + * This is a space where blocks are indexed (starting from zero). + * + * So every coordinate in the original space can be mapped to this space and to the + * corrisponding block. A block have a row and a column coordinate that explicits + * its position. Every set of coordinates in the mapped space corresponds to a square + * of size rowPerBlock x colsPerBlock. + * + */ +case class BlockMapper( //original matrix size + numRows: Int, + numCols: Int, + //block size + rowsPerBlock: Int, + colsPerBlock: Int) { + + require(numRows >= rowsPerBlock && numCols >= colsPerBlock) + val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt + val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt + val numBlocks = numBlockCols * numBlockRows + + /** +* Translates absolute coordinates to the mapped coordinates of the block +* these coordinates belong to. +* @param i +* @param j +* @return +*/ + def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, Int) = +getBlockMappedCoordinates(getBlockIdByCoordinates(i, j)) + + /** +* Retrieves a block id from original coordinates +* @param i Original row +* @param j Original column +* @return Block ID +*/ + def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = { + +if (i < 0 || j < 0 || i >= numRows || j >= numCols) { + throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).") +} else { + val mappedRow = i / rowsPerBlock + val mappedColumn = j / colsPerBlock + val res = mappedRow * numBlockCols + mappedColumn + + assert(res <= numBlocks) + res +} + } + + /** +* Retrieves mapped coordinates for a given block. +* @param blockId +* @return +*/ + def getBlockMappedCoordinates(blockId: Int): (Int, Int) = { +if (blockId < 0 || blockId > numBlockCols * numBlockRows) { + throw new IllegalArgumentException( + s"BlockId numeration starts from 0. $blockId is not a valid Id" + ) +} else { + val i = blockId / numBlockCols + val j = blockId % numBlockCols + (i, j) +} + } + + /** +* Retrieves the ID of the block at the given coordinates +* @param i +* @param j +* @return +*/ + def getBlockIdByMappedCoord(i: Int, j: Int): Int = { + +if (i < 0 || j < 0 || i >= numBlockRows || j >= numBlockCols) { + throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).") --- End diff -- Please use `require`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76550871 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix} + +class Block() { --- End diff -- How about changing type of `Block` to case class? It would be better than class with `apply` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76550821 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix} + +class Block() { + + var blockData: FlinkMatrix = null + + def setBlockData(flinkMatrix: FlinkMatrix) = blockData = flinkMatrix + + def getBlockData = blockData + + def toBreeze = blockData.asBreeze + + def getCols = blockData.numCols + + def getRows = blockData.numRows --- End diff -- get/set naming convention is not suitable for Scala. We need to change `getCols` and `getRows` to `numCols` and `numRows`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76550602 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix} + +class Block() { + + var blockData: FlinkMatrix = null --- End diff -- Does we need to set block data as mutable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/2152 Sorry for long no response. Could you give me few days more? I'll review in weekend. Sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68891682 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/BlockMatrixSuite.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.SparseMatrix +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, GivenWhenThen, Matchers} + +class BlockMatrixSuite +extends FlatSpec +with Matchers +with GivenWhenThen +with FlinkTestBase { + + val env = ExecutionEnvironment.getExecutionEnvironment --- End diff -- I found Travis-CI is failed because of this line. We cannot reuse `ExecutionEnvironment` due to order of class initialization. It seems related to FLINK-3994. Please move initialization of `ExecutionEnvironment` and `DistributedRowMatrix` into each test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/2152 This PR contains unnecessary code changes related to `DistributedRowMatrix` (`DistributedRowMatrix.scala` and `DistributedRowMatrixSuite.scala`). Please sync this PR with current master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68885212 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang --- End diff -- This line still imports `java.lang`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696822 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val getDataset = data + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && +this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left).getOrElse( +(right._1, Block.zero(right._2.getRows, right._2.getCols))) + +val (id2, block2) = Option(right).getOrElse( +(left._1, Block.zero(left._2.getRows, left._2.getCols))) + +require(id1 == id2) +(id1, fun(block1, block2)) + } + } +new BlockMatrix(processedBlocks, blockMapper) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +*/ + def sum(other: BlockMatrix): BlockMatrix = { --- End diff -- We agreed that `add` is more proper name for this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696809 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val getDataset = data + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && +this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left).getOrElse( +(right._1, Block.zero(right._2.getRows, right._2.getCols))) + +val (id2, block2) = Option(right).getOrElse( +(left._1, Block.zero(left._2.getRows, left._2.getCols))) + +require(id1 == id2) --- End diff -- Is there a case of `id1 != id2` if `hasSameFormat(other)` is true? It seems unnecessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696680 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val getDataset = data + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && +this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left).getOrElse( +(right._1, Block.zero(right._2.getRows, right._2.getCols))) --- End diff -- We need to change this like following to reduce creation of `Block` object: ```scala val (id1, block1) = Option(left) match { case Some((id, block)) => (id, block) case None => (right._1, Block.zero(right._2.getRows, right._2.getCols))) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696692 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +data: DataSet[(BlockID, Block)], +blockMapper: BlockMapper +) +extends DistributedMatrix { + + val getDataset = data + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** +* Compares the format of two block matrices +* @return +*/ + def hasSameFormat(other: BlockMatrix): Boolean = +this.numRows == other.numRows && +this.numCols == other.numCols && +this.getRowsPerBlock == other.getRowsPerBlock && +this.getColsPerBlock == other.getColsPerBlock + + /** +* Perform an operation on pairs of block. Pairs are formed taking +* matching blocks from the two matrices that are placed in the same position. +* A function is then applied to the pair to return a new block. +* These blocks are then composed in a new block matrix. +*/ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { +require(hasSameFormat(other)) + +/*Full outer join on blocks. The full outer join is required because of +the sparse nature of the matrix. +Matching blocks may be missing and a block of zeros is used instead.*/ +val processedBlocks = + this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) { +(left: (BlockID, Block), right: (BlockID, Block)) => + { + +val (id1, block1) = Option(left).getOrElse( +(right._1, Block.zero(right._2.getRows, right._2.getCols))) + +val (id2, block2) = Option(right).getOrElse( +(left._1, Block.zero(left._2.getRows, left._2.getCols))) --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696490 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( +data: DataSet[(BlockID, Block)], --- End diff -- I think that changing visibility to public (`val`) and removing `getDataset` method would be better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696421 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang --- End diff -- Does we need this import? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/2152 Thanks for opening pull request @chobeat! I would like to shepherd this PR. I'll review in weekend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1985: [FLINK-1979] Add logistic loss, hinge loss and regulariza...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/1985 Hi @skavulya, sorry for late response. I've checked the updated PR and looks good to me. I wonder whether the name `RegularizationPenalty` is proper because the class calculates lots of values. But it is trivial thing. If there is no objection in few days, I'll merge this. Thanks! ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-ba...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/1996 Looks good to me, +1. I'll merge this in few hours. But I think we should change the type of row index to `Long` in near future. I think we can deal with the incompatibility problem with local matrices by assuming that the number of rows is less than `Int.MaxValue` and converting row index type from `Long` to `Int` in collecting time. I'll submit an issue related to this to JIRA after merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra:...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r68041009 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.distributed.DistributedMatrix._ +import org.apache.flink.ml.math._ + +/** + * Distributed row-major matrix representation. + * @param numRows Number of rows. + * @param numCols Number of columns. + */ +class DistributedRowMatrix(val data: DataSet[IndexedRow], + val numRows: Int, + val numCols: Int) +extends DistributedMatrix { + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO) +require(localMatrix.numRows == this.numRows) +require(localMatrix.numCols == this.numCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.data +require(this.numCols == other.numCols) +require(this.numRows == other.numRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0 +} +val row2 = Option(right) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0 +} +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRows, numCols) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +*/ + def sum(other: DistributedRowMatrix): DistributedRowMatrix = { --- End diff -- It is not your fault. :-) It seems my fault. Sorry. Let's merge this to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2076: [FLINK-2832] [tests] Hardens RandomSamplerTest
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/2076 Failing Travis is not related to this PR. Looks good to me! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2076: [FLINK-2832] [tests] Hardens RandomSamplerTest
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2076#discussion_r6588 --- Diff: flink-java/src/test/resources/logback-test.xml --- @@ -0,0 +1,34 @@ + + + + + +%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + + + + + --- End diff -- we need to add new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2076: [FLINK-2832] [tests] Hardens RandomSamplerTest
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2076#discussion_r6573 --- Diff: flink-java/src/test/resources/log4j-test.properties --- @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err --- End diff -- Spaces around `=` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra:...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65688309 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.distributed.DistributedMatrix._ +import org.apache.flink.ml.math._ + +/** + * Distributed row-major matrix representation. + * @param numRows Number of rows. + * @param numCols Number of columns. + */ +class DistributedRowMatrix(val data: DataSet[IndexedRow], + val numRows: Int, + val numCols: Int) +extends DistributedMatrix { + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO) +require(localMatrix.numRows == this.numRows) +require(localMatrix.numCols == this.numCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.data +require(this.numCols == other.numCols) +require(this.numRows == other.numRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0 +} +val row2 = Option(right) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0 +} +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRows, numCols) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +*/ + def sum(other: DistributedRowMatrix): DistributedRowMatrix = { --- End diff -- Is `add` more proper name for this method? Please do not update this PR if you agree with me but just notify me because I'm rebasing this PR on current master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra:...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65657170 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRows Number of rows. + * @param numCols Number of columns. + */ +class DistributedRowMatrix(val data: DataSet[IndexedRow], + val numRows: Int, + val numCols: Int ) +extends DistributedMatrix { + + + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO) +require(localMatrix.numRows == this.numRows) +require(localMatrix.numCols == this.numCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.data +require(this.numCols == other.numCols) +require(this.numRows == other.numRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0 +} +val row2 = Option(right) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0 +} +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRows, numCols) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +*/ + def sum(other: DistributedRowMatrix): DistributedRowMatrix = { +val sumFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) => + (x.asBreeze + y.asBreeze).fromBreeze +this.byRowOperation(sumFunction, other) + } + + /** +* Subtracts another matrix. +* @param other +* @return +*/ + def subtract(other: DistributedRowMatrix): DistributedRowMatrix = { +val subFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) => +
[GitHub] flink issue #1985: [FLINK-1979] Add logistic loss, hinge loss and regulariza...
Github user chiwanpark commented on the issue: https://github.com/apache/flink/pull/1985 Okay, please ping me when the PR is updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65355487 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRows Number of rows. + * @param numCols Number of columns. + */ +class DistributedRowMatrix(val data: DataSet[IndexedRow], + val numRows: Int, + val numCols: Int ) +extends DistributedMatrix { + + + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO) +require(localMatrix.numRows == this.numRows) +require(localMatrix.numCols == this.numCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.data +require(this.numCols == other.numCols) +require(this.numRows == other.numRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0 +} +val row2 = Option(right) match { + case Some(row: IndexedRow) => row + case None => +IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0 +} +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRows, numCols) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +*/ + def sum(other: DistributedRowMatrix): DistributedRowMatrix = { +val sumFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) => + (x.asBreeze + y.asBreeze).fromBreeze +this.byRowOperation(sumFunction, other) + } + + /** +* Subtracts another matrix. +* @param other +* @return +*/ + def subtract(other: DistributedRowMatrix): DistributedRowMatrix = { +val subFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) => +
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996 @chobeat Okay. Then we should force user to calculate dimensionality of matrix by changing type of number parameters in constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65311449 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], --- End diff -- It seems that `getRowData` is redundant. Changing `data` to public (by adding `val` keyword in previous `data`) would be better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996 Hi @chobeat, thanks for update PR. After addressing comments on source code, I think the last thing to merge this is adding documentation for this. But you can add the documentation after block-based matrix is merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309988 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => numRows.collect().head + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => numCols.collect().head + } + + lazy val numRows: DataSet[Int] = numRowsOpt match { +case Some(rows) => data.getExecutionEnvironment.fromElements(rows) +case None => data.max("rowIndex").map(_.rowIndex + 1) + } + + lazy val numCols: DataSet[Int] = numColsOpt match { +case Some(cols) => data.getExecutionEnvironment.fromElements(cols) +case None => data.first(1).map(_.values.size) + } + + val getRowData = data + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) --- End diff -- I would like to rewrite this block like following to avoid create unnecessary `IndexedRow` object: ```scala val row1 = Option(left) match { cas
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309186 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => numRows.collect().head + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => numCols.collect().head + } + + lazy val numRows: DataSet[Int] = numRowsOpt match { +case Some(rows) => data.getExecutionEnvironment.fromElements(rows) +case None => data.max("rowIndex").map(_.rowIndex + 1) + } + + lazy val numCols: DataSet[Int] = numColsOpt match { +case Some(cols) => data.getExecutionEnvironment.fromElements(cols) +case None => data.first(1).map(_.values.size) + } + + val getRowData = data + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRowsOpt, numColsOpt) + } +
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309134 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } + + val getRowData = data + + private def calcCols: Int = +data.first(1).collect().headOption match { + case Some(vector) => vector.values.size + case None => 0 +} + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRowsOpt, numColsOpt) + } + + /** +
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1985 Hi @skavulya, I just quickly reviewed your updated PR and left few comments. They are not critical things but It would be better to fix them. Other things are very good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291933 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyTest.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class RegularizationPenaltyTest extends FlatSpec with Matchers with FlinkTestBase { --- End diff -- Same as above, we don't need to extend `FlinkTestBase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291911 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.{LabeledVector, WeightVector} +import org.apache.flink.ml.math.DenseVector +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.test.util.FlinkTestBase + + +class LossFunctionTest extends FlatSpec with Matchers with FlinkTestBase { --- End diff -- We don't need to extend `FlinkTestBase` because `LossFunctionTest` do not use Flink cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291552 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.math.{Vector, BLAS} +import org.apache.flink.ml.math.Breeze._ +import breeze.linalg.{norm => BreezeNorm} + +/** Represents a type of regularization penalty + * + * Regularization penalties are used to restrict the optimization problem to solutions with + * certain desirable characteristics, such as sparsity for the L1 penalty, or penalizing large + * weights for the L2 penalty. + * + * The regularization term, `R(w)` is added to the objective function, `f(w) = L(w) + lambda*R(w)` + * where lambda is the regularization parameter used to tune the amount of regularization applied. + */ +trait RegularizationPenalty extends Serializable { + + /** Calculates the new weights based on the gradient and regularization penalty +* +* @param weightVector The weights to be updated +* @param gradient The gradient used to update the weights +* @param regularizationConstant The regularization parameter to be applied +* @param learningRate The effective step size for this iteration +* @return Updated weights +*/ + def takeStep( + weightVector: Vector, + gradient: Vector, + regularizationConstant: Double, + learningRate: Double) +: Vector + + /** Adds regularization to the loss value +* +* @param oldLoss The loss to be updated +* @param weightVector The gradient used to update the loss +* @param regularizationConstant The regularization parameter to be applied +* @return Updated loss +*/ + def regLoss(oldLoss: Double, weightVector: Vector, regularizationConstant: Double): Double + +} + + +/** `L_2` regularization penalty. + * + * The regularization function is the square of the L2 norm `1/2*||w||_2^2` + * with `w` being the weight vector. The function penalizes large weights, + * favoring solutions with more small weights rather than few large ones. + */ +object L2Regularization extends RegularizationPenalty { + + /** Calculates the new weights based on the gradient and L2 regularization penalty +* +* The updated weight is `w - learningRate *(gradient + lambda * w)` where +* `w` is the weight vector, and `lambda` is the regularization parameter. +* +* @param weightVector The weights to be updated +* @param gradient The gradient according to which we will update the weights +* @param regularizationConstant The regularization parameter to be applied +* @param learningRate The effective step size for this iteration +* @return Updated weights +*/ + override def takeStep( + weightVector: Vector, + gradient: Vector, + regularizationConstant: Double, + learningRate: Double) +: Vector = { +// add the gradient of the L2 regularization +BLAS.axpy(regularizationConstant, weightVector, gradient) + +// update the weights according to the learning rate +BLAS.axpy(-learningRate, gradient, weightVector) + +weightVector + } + + /** Adds regularization to the loss value +* +* The updated loss is `oldLoss + lambda * 1/2*||w||_2^2` where +* `w` is the weight vector, and `lambda` is the regularization parameter +* +* @param oldLoss The loss to be updated +* @param weightVector The gradient used to update the loss +* @param regularizationConstant The regularization parameter to be applied +* @return Updated loss +*/
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291461 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala --- @@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction { /** Calculates the loss depending on the label and the prediction * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The loss */ override def loss(prediction: Double, label: Double): Double = { 0.5 * (prediction - label) * (prediction - label) } /** Calculates the derivative of the [[PartialLossFunction]] * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function */ override def derivative(prediction: Double, label: Double): Double = { (prediction - label) } } + +/** Logistic loss function which can be used with the [[GenericLossFunction]] + * + * + * The [[LogisticLoss]] function implements `log(1 + -exp(prediction*label))` + * for binary classification with label in {-1, 1} + */ +object LogisticLoss extends PartialLossFunction { + + /** Calculates the loss depending on the label and the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The loss +*/ + override def loss(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return math.exp(-z) +} +else if (z < -18) { + return -z +} + +math.log(1 + math.exp(-z)) + } + + /** Calculates the derivative of the loss function with respect to the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function +*/ + override def derivative(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return -label * math.exp(-z) +} +else if (z < -18) { + return -label +} + +-label/(math.exp(z) + 1) + } --- End diff -- As I said above, following is better: ```scala if (z > 18) { -label * math.exp(-z) } else if (z < -18) { -label } else { -label / (math.exp(z) + 1) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291353 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala --- @@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction { /** Calculates the loss depending on the label and the prediction * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The loss */ override def loss(prediction: Double, label: Double): Double = { 0.5 * (prediction - label) * (prediction - label) } /** Calculates the derivative of the [[PartialLossFunction]] * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function */ override def derivative(prediction: Double, label: Double): Double = { (prediction - label) } } + +/** Logistic loss function which can be used with the [[GenericLossFunction]] + * + * + * The [[LogisticLoss]] function implements `log(1 + -exp(prediction*label))` + * for binary classification with label in {-1, 1} + */ +object LogisticLoss extends PartialLossFunction { + + /** Calculates the loss depending on the label and the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The loss +*/ + override def loss(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return math.exp(-z) +} +else if (z < -18) { + return -z +} + +math.log(1 + math.exp(-z)) --- End diff -- Using `return` is not recommended in Scala. Could you change this like following? ```scala if (z > 18) { math.exp(-z) } else if (z < -18) { -z } else { math.log(1 + math.exp(-z)) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Thanks for clarifying @StephanEwen! I'll merge this after Travis succeed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Thanks for guide @mxm. I'll set upper limit for the parallelism to 4. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
GitHub user chiwanpark opened a pull request: https://github.com/apache/flink/pull/2056 [FLINK-3994] [ml, tests] Fix flaky KNN integration tests This PR is related to flaky KNN integration tests. The problem is caused by sharing `ExecutionEnvironment` between test cases. I'm not sure about exact reason. This PR makes each test case have own `ExecutionEnvironment`. Tests on my local machine and my Travis-CI [1] is passed with this PR. I have some doubt because this is not essential fix for the problem. AFAIK and @StephanEwen said, sharing `ExecutionEnvironment` should be supported. Addtionally, `mvn clean verify` has passed without this PR on my local machine. If there are any other opinions, please leave comment. [1]: https://travis-ci.org/chiwanpark/flink/builds/134104491 p.s. we need to re-write commit message. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chiwanpark/flink hotfix-ml-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2056 commit a47ae8481bcbac2c490386089ee6b1e740f3a1f4 Author: Chiwan Park <chiwanp...@apache.org> Date: 2016-05-31T08:50:05Z [hotfix] [ml] Fix flaky KNN integration tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Added addition, subtraction and multiply by sc...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2052#issuecomment-222644723 Hi @danielblazevski, thanks for opening pull request. But we need a related JIRA issue for this. Could you create an issue and change title of this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65145402 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } --- End diff -- Yes, you are right. Thanks for correction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2131][ml]: Initialization schemes for k...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/757#issuecomment-222599628 Any updates on this? If there is no update, I would like to push additional effort based on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2673] [type system] Add a comparator fo...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2017#issuecomment-222577020 Thanks for review @StephanEwen! Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65102583 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } + + val getRowData = data + + private def calcCols: Int = +data.first(1).collect().headOption match { + case Some(vector) => vector.values.size + case None => 0 +} + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRowsOpt, numColsOpt) + } + + /** +
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65102444 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } --- End diff -- Ah. okay. Let's use `Int` for indices now. About counting, we can re-use `DataSetUtils.countElementsPerPartition` method. (https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala#L56) ```scala import org.apache.flink.scala.utils._ ... lazy val numRowsInDataSet = numRowsOpt match { case Some(value) => data.getExecutionEnvironment.fromElements(value) case None => data.countElementsPerPartition.map(_._2).reduce(_ + _).map(_.toInt) } ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65076170 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } --- End diff -- I think it is okay. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996#issuecomment-222488228 There are some public methods (`sum`, `subtract`, `byRowOperation`, etc.) without scaladoc. Please add scaladoc for the methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65067628 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } --- End diff -- I meant the type of **row** number. Does we use row number as indices? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65066995 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } --- End diff -- I simplify my comments. Sorry for confusing. Type of number of columns could be `Int` only but we should provide the number of rows in both methods (DataSet and raw number). By the way, isn't `Long` reasonable for type of number of rows? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-222482793 @danielblazevski I'm using a custom configuration of IntelliJ code formatter. There is a pending [pull request](https://github.com/apache/flink/pull/1963/files) about code formatting in IntelliJ. This might be helpful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65064961 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } + + val getRowData = data + + private def calcCols: Int = +data.first(1).collect().headOption match { + case Some(vector) => vector.values.size + case None => 0 +} + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; +(columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + def byRowOperation( + fun: (Vector, Vector) => Vector, other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val ev1: TypeInformation[Int] = TypeInformation.of(classOf[Int]) + +val result = this.data + .fullOuterJoin(otherData) + .where(_.rowIndex) + .equalTo(_.rowIndex)(ev1)( --- End diff -- Key selector function creates copying keys overhead. please change this to like following: ```scala val result = this.data .fullOuterJoin(otherData) .where("rowIndex") .equalTo("rowIndex") { (left: IndexedRow, right: IndexedRow) => ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65063499 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } --- End diff -- I still prefer numbers in `DataSet` but providing the numbers in both methods (in `DataSet[Int]` and `Int`) would be okay. Sometimes users don't know the size of matrix (such as generating the matrix from other data). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65060611 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** --- End diff -- please scaladoc here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65060576 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +trait DistributedMatrix { --- End diff -- We need scaladoc here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65060527 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +trait DistributedMatrix { + + def getNumRows: Int + def getNumCols: Int --- End diff -- How about changing the return type of `getNumRows` and `getNumCols` to `DataSet[Long]`? Returning number of elements directly in distributed data in Flink is expensive operation. We can use broadcast variable to transfer the numbers to future operation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996#issuecomment-222473905 Hi @chobeat, thanks for opening pull request. I would like to shepherd this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink 1934] Add approximative k-nearest-neigh...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2048#issuecomment-222397141 Hi @danielblazevski, thanks for opening pull request. I would like to split approximative knn and exact knn. If the approximative method needs exact knn implementation, could you wait until merging #1220? I'll merge it in few days (maybe today or tomorrow at latest). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2673] [type system] Add a comparator fo...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2017#issuecomment-220833561 Failing Travis is not related to this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2673] [type system] Add a comparator fo...
GitHub user chiwanpark opened a pull request: https://github.com/apache/flink/pull/2017 [FLINK-2673] [type system] Add a comparator for Scala Option type Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed --- I've added `OptionTypeComparator` implementation, unit test cases based on `ComparatorTestBase` and a simple integration test case in `JoinITCase`. I followed [Till's opinion](http://markmail.org/message/vk3t3vpc6j6zbldk) about order of `None` and `Some` values. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chiwanpark/flink FLINK-2673 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2017.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2017 commit 485b6904013c23316a9f61d651b44e245e47c61c Author: Chiwan Park <chiwanp...@apache.org> Date: 2016-05-22T11:39:10Z [FLINK-2673] [core] Add a comparator for Scala Option type --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r63815357 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k`-nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]] + * A boolean variable that whether or not to use a Quadtree to partition the training set + * to potentially simplify the KNN search. If no value is specified, the code will + * automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well + * with the number of training and testing points, though poorly with the dimension. + * (Default value: ```None```) + * + * - [[org.apache.flink.ml.nn.KNN.SizeHint]] + * Specifies whether the training set or test set is small to optimize the cross + * product operation needed for the KNN search. If the training set is small + * this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` + * if the test set is small. + * (Default value: ```None```) + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k > 0, "K must be positive.") +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r63712972 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k`-nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]] + * A boolean variable that whether or not to use a Quadtree to partition the training set + * to potentially simplify the KNN search. If no value is specified, the code will + * automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well + * with the number of training and testing points, though poorly with the dimension. + * (Default value: ```None```) + * + * - [[org.apache.flink.ml.nn.KNN.SizeHint]] + * Specifies whether the training set or test set is small to optimize the cross + * product operation needed for the KNN search. If the training set is small + * this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` + * if the test set is small. + * (Default value: ```None```) + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k > 0, "K must be positive.") +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-219702506 Great to hear that z-knn is almost done! If you think the implementation has good shape, do not hesitate to open a pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-219203934 I would like to merge this PR. I've checked the output of k-NN join (with, without quadtree). Documentation looks good. To maintainers (@tillrohrmann @thvasilo): Could you do final check this PR? You do not need to bother code style issues. I'll address them before merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-215369160 @danielblazevski Sorry for late check. I check your PR and have few comments. After addressing, I would like to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61398228 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/classification/Classification.scala --- @@ -131,3 +131,6 @@ object Classification { val expectedWeightVector = DenseVector(-1.95, -3.45) } + + + --- End diff -- Are these new lines necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61398006 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/QuadTree.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.ml.math.{Breeze, Vector} +import Breeze._ + +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, +EuclideanDistanceMetric, DistanceMetric} + +import scala.collection.mutable.ListBuffer +import scala.collection.mutable.PriorityQueue + +/** + * n-dimensional QuadTree data structure; partitions + * spatial data for faster queries (e.g. KNN query) + * The skeleton of the data structure was initially + * based off of the 2D Quadtree found here: + * http://www.cs.trinity.edu/~mlewis/CSCI1321-F11/Code/src/util/Quadtree.scala + * + * Many additional methods were added to the class both for + * efficient KNN queries and generalizing to n-dim. + * + * @param minVec vector of the corner of the bounding box with smallest coordinates + * @param maxVec vector of the corner of the bounding box with smallest coordinates + * @param distMetric metric, must be Euclidean or squareEuclidean + * @param maxPerBox threshold for number of points in each box before slitting a box + */ +class QuadTree( + minVec: Vector, + maxVec: Vector, + distMetric: DistanceMetric, + maxPerBox: Int) { + + class Node( +center: Vector, +width: Vector, +var children: Seq[Node]) { + +val nodeElements = new ListBuffer[Vector] + +/** for testing purposes only; used in QuadTreeSuite.scala + * + * @return center and width of the box + */ +def getCenterWidth(): (Vector, Vector) = { + (center, width) +} + +def contains(queryPoint: Vector): Boolean = { --- End diff -- Could you add a scaladoc for this method? All public methods should have a scaladoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397870 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k` nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]] + * A boolean variable that whether or not to use a Quadtree to partition the training set + * to potentially simplify the KNN search. If no value is specified, the code will + * automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well + * with the number of training and testing points, though poorly with the dimension. + * (Default value: ```None```) + * + * - [[org.apache.flink.ml.nn.KNN.SizeHint]] + * Specifies whether the training set or test set is small to optimize the cross + * product operation needed for the KNN search. If the training set is small + * this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` + * if the test set is small. + * (Default value: ```None```) + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k > 0, "K must be positive.") +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397754 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k` nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]] + * A boolean variable that whether or not to use a Quadtree to partition the training set + * to potentially simplify the KNN search. If no value is specified, the code will + * automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well + * with the number of training and testing points, though poorly with the dimension. + * (Default value: ```None```) + * + * - [[org.apache.flink.ml.nn.KNN.SizeHint]] + * Specifies whether the training set or test set is small to optimize the cross + * product operation needed for the KNN search. If the training set is small + * this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` + * if the test set is small. + * (Default value: ```None```) + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k > 0, "K must be positive.") +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397659 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k` nearest neighbor points in the training set for each point in the test set. --- End diff -- Calculates the _`k`-nearest_ neighbor points ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397574 --- Diff: docs/libs/ml/knn.md --- @@ -0,0 +1,146 @@ +--- +mathjax: include +htmlTitle: FlinkML - k-nearest neighbors +title: FlinkML - knn +--- + + +* This will be replaced by the TOC +{:toc} + +## Description +Implements an exact k-nearest neighbors algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns + +$$ +KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) are the k-nearest points to b in A \} +$$ + +The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every traning point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree. + +##Operations + +`KNN` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +KNN is trained given a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +KNN predicts for all subtypes of FlinkML's `Vector` the corresponding class label: + +* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where the `(T, Array[Vector])` tuple + corresponds to (testPoint, K-nearest training points) + +## Paremeters +The KNN implementation can be controlled by the following parameters: + + + + +Parameters +Description + + + + + +K + + +Defines the number of nearest-neoghbors to search for. That is, for each test point, the algorithm finds the K nearest neighbors in the training set +(Default value: 5) + + + + + DistanceMetric + + +Sets the distance metric we use to calculate the distance between two points. If no metric is specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. +(Default value: EuclideanDistanceMetric ) + + + + +Blocks + + +Sets the number of blocks into which the input data will be split. This number should be set +at least to the degree of parallelism. If no value is specified, then the parallelism of the +input [[DataSet]] is used as the number of blocks. +(Default value: None) + + + + +UseQuadTreeParam + + + A boolean variable that whether or not to use a Quadtree to partition the training set to potentially simplify the KNN search. If no value is specified, the code will automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well with the number of training and testing points, though poorly with the dimension. +(Default value: None) + + + + +SizeHint + + Specifies whether the training set or test set is small to optimize the cross product operation needed for the KNN search. If the training set is small this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` if the test set is small. + (Default value: None) + + + + + + +## Examples + +{% highlight scala %} +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint +import org.apache.flink.api.scala._ +import org.apache.flink.ml.classification.Classification +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.metrics.distances. +SquaredEuclideanDistanceMetric + + val env = ExecutionEnvironment.getExecutionEnvironment + + // prepare data + val trainingSet = env.fromCollection(Classification.trainingData).map(_.vector) + val testingSet = env.fromElements(DenseVector(0.0, 0.0)) + + val knn = KNN() +.setK(3) +.setBlocks(10) +.setDistanceMetric(SquaredEuclideanDistanceMetric()) +.setUseQuadTree(false) +.setSizeHint(CrossHint.SECOND_IS_SMALL) + + // run knn join + knn.fit(trainingSet) + val result = knn.predict(testingSet).collect() + +{% endhighlight %} + +For mor
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397472 --- Diff: docs/libs/ml/knn.md --- @@ -0,0 +1,146 @@ +--- +mathjax: include +htmlTitle: FlinkML - k-nearest neighbors +title: FlinkML - knn +--- + + +* This will be replaced by the TOC +{:toc} + +## Description +Implements an exact k-nearest neighbors algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns + +$$ +KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) are the k-nearest points to b in A \} +$$ + +The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every traning point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree. + +##Operations + +`KNN` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +KNN is trained given a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +KNN predicts for all subtypes of FlinkML's `Vector` the corresponding class label: + +* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where the `(T, Array[Vector])` tuple + corresponds to (testPoint, K-nearest training points) + +## Paremeters +The KNN implementation can be controlled by the following parameters: + + + + +Parameters +Description + + + + + +K + + +Defines the number of nearest-neoghbors to search for. That is, for each test point, the algorithm finds the K nearest neighbors in the training set +(Default value: 5) + + + + + DistanceMetric + + +Sets the distance metric we use to calculate the distance between two points. If no metric is specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. +(Default value: EuclideanDistanceMetric ) + + + + +Blocks + + +Sets the number of blocks into which the input data will be split. This number should be set +at least to the degree of parallelism. If no value is specified, then the parallelism of the +input [[DataSet]] is used as the number of blocks. +(Default value: None) + + + + +UseQuadTreeParam + + + A boolean variable that whether or not to use a Quadtree to partition the training set to potentially simplify the KNN search. If no value is specified, the code will automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well with the number of training and testing points, though poorly with the dimension. +(Default value: None) + + + + +SizeHint + + Specifies whether the training set or test set is small to optimize the cross product operation needed for the KNN search. If the training set is small this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` if the test set is small. + (Default value: None) + + + + + + +## Examples + +{% highlight scala %} +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint +import org.apache.flink.api.scala._ +import org.apache.flink.ml.classification.Classification +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.metrics.distances. +SquaredEuclideanDistanceMetric --- End diff -- Could you move this line to end of previous line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397325 --- Diff: docs/libs/ml/knn.md --- @@ -0,0 +1,146 @@ +--- +mathjax: include +htmlTitle: FlinkML - k-nearest neighbors +title: FlinkML - knn +--- + + +* This will be replaced by the TOC +{:toc} + +## Description +Implements an exact k-nearest neighbors algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns + +$$ +KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) are the k-nearest points to b in A \} +$$ + +The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every traning point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree. + +##Operations + +`KNN` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +KNN is trained given a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +KNN predicts for all subtypes of FlinkML's `Vector` the corresponding class label: + +* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where the `(T, Array[Vector])` tuple + corresponds to (testPoint, K-nearest training points) + +## Paremeters +The KNN implementation can be controlled by the following parameters: + + + + +Parameters +Description + + + + + +K + + +Defines the number of nearest-neoghbors to search for. That is, for each test point, the algorithm finds the K nearest neighbors in the training set +(Default value: 5) + + + + + DistanceMetric + + +Sets the distance metric we use to calculate the distance between two points. If no metric is specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. +(Default value: EuclideanDistanceMetric ) --- End diff -- Please remove space before EuclideanDistanceMetric and after it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397260 --- Diff: docs/libs/ml/knn.md --- @@ -0,0 +1,146 @@ +--- +mathjax: include +htmlTitle: FlinkML - k-nearest neighbors +title: FlinkML - knn +--- + + +* This will be replaced by the TOC +{:toc} + +## Description +Implements an exact k-nearest neighbors algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns + +$$ +KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) are the k-nearest points to b in A \} +$$ + +The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every traning point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree. + +##Operations + +`KNN` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +KNN is trained given a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +KNN predicts for all subtypes of FlinkML's `Vector` the corresponding class label: + +* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where the `(T, Array[Vector])` tuple + corresponds to (testPoint, K-nearest training points) + +## Paremeters +The KNN implementation can be controlled by the following parameters: + + + + +Parameters +Description + + + + + +K + + +Defines the number of nearest-neoghbors to search for. That is, for each test point, the algorithm finds the K nearest neighbors in the training set +(Default value: 5) + + + + + DistanceMetric --- End diff -- Please remove space before DistanceMetric --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397191 --- Diff: docs/libs/ml/knn.md --- @@ -0,0 +1,146 @@ +--- +mathjax: include +htmlTitle: FlinkML - k-nearest neighbors +title: FlinkML - knn +--- + + +* This will be replaced by the TOC +{:toc} + +## Description +Implements an exact k-nearest neighbors algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns + +$$ +KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) are the k-nearest points to b in A \} +$$ + +The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every traning point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree. + +##Operations + +`KNN` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +KNN is trained given a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +KNN predicts for all subtypes of FlinkML's `Vector` the corresponding class label: + +* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where the `(T, Array[Vector])` tuple + corresponds to (testPoint, K-nearest training points) + +## Paremeters +The KNN implementation can be controlled by the following parameters: + + + + +Parameters +Description + + + + + +K + + +Defines the number of nearest-neoghbors to search for. That is, for each test point, the algorithm finds the K nearest neighbors in the training set --- End diff -- the algorithm finds the _K-nearest_ neighbors ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r61397153 --- Diff: docs/libs/ml/knn.md --- @@ -0,0 +1,146 @@ +--- +mathjax: include +htmlTitle: FlinkML - k-nearest neighbors +title: FlinkML - knn +--- + + +* This will be replaced by the TOC +{:toc} + +## Description +Implements an exact k-nearest neighbors algorithm. Given a training set $A$ and a testing set $B$, the algorithm returns + +$$ +KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) are the k-nearest points to b in A \} +$$ + +The brute-force approach is to compute the distance between every training and testing point. To ease the brute-force computation of computing the distance between every traning point a quadtree is used. The quadtree scales well in the number of training points, though poorly in the spatial dimension. The algorithm will automatically choose whether or not to use the quadtree, though the user can override that decision by setting a parameter to force use or not use a quadtree. + +##Operations + +`KNN` is a `Predictor`. +As such, it supports the `fit` and `predict` operation. + +### Fit + +KNN is trained given a set of `LabeledVector`: + +* `fit: DataSet[LabeledVector] => Unit` + +### Predict + +KNN predicts for all subtypes of FlinkML's `Vector` the corresponding class label: + +* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where the `(T, Array[Vector])` tuple + corresponds to (testPoint, K-nearest training points) + +## Paremeters +The KNN implementation can be controlled by the following parameters: + + + + +Parameters +Description + + + + + +K + + +Defines the number of nearest-neoghbors to search for. That is, for each test point, the algorithm finds the K nearest neighbors in the training set --- End diff -- Defines the number of nearest-_neighbors_ ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-209854438 Hi @danielblazevski, sorry for late reply. I checked your updated PR but your last commit (d6f90ce) seems wrong. The commit removes KNN.scala, QuadTree.scala, KNNITSuite.scala, and QuadTreeSuite.scala. Could you check again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-206350336 Ah, right @rmetzger. We can go ahead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-206345431 I just found the author email is not same as @nikste's github account. It is trivial but fixing email address would be better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-206344388 Looks good to me. +1 I tested a simple streaming word count example with socket stream in following configuration: * Flink on Hadoop YARN 2.7.2 (both Scala 2.10, 2.11) * Flink local cluster (both Scala 2.10, 2.11) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-205274226 I prefer `senv` and `benv` because it is shorter than `streamEnv` and `batchEnv`. In shell, shorter variables would be better even though there is a auto-complete support. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1837#issuecomment-203323834 I tested this PR in local cluster. Looks good to me. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix][docs] Fix a couple of typos in YARN d...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1836#issuecomment-202022859 Nice catch. I'll merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1500#issuecomment-200804302 Oh, thanks for pointing it @mxm. Then I'll merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1500#issuecomment-199796720 Hi all, I would like to merge this. If there is no objection and travis passes, I'll merge this to master in few days. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3645] [tests] HDFSCopyUtilitiesTest fai...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1825#issuecomment-199775815 Merged. Thanks for review @aljoscha! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3611] [docs] corrected link in CONTRIBU...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1786#issuecomment-199704337 +1, Looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3645] [tests] HDFSCopyUtilitiesTest fai...
GitHub user chiwanpark opened a pull request: https://github.com/apache/flink/pull/1825 [FLINK-3645] [tests] HDFSCopyUtilitiesTest fails in a Hadoop cluster Please check [JIRA](https://issues.apache.org/jira/browse/FLINK-3645). `HDFSCopyUtilitiesTest` should use local file system only. This PR forces the test to use local file system. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chiwanpark/flink FLINK-3645 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1825.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1825 commit 019715e842c6e44256bda008bbd697287f681382 Author: Chiwan Park <chiwanp...@apache.org> Date: 2016-03-22T08:44:35Z [FLINK-3645] [tests] Force HDFSCOPYUtilitiesTest to use local file system --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1220#issuecomment-199648335 Hi @danielblazevski, thanks for update! Looks good to me for implementation. (Some minor issues and rebasing will be addressed by me.) About docs, I meant we need to add description, examples and meaning of parameters to documentation in our homepage (`docs/libs/ml/knn.md`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Update python.md
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1814#issuecomment-199640748 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---