[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...

2016-09-02 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2459
  
Hi @shijinkui, thanks for opening pull request. Unfortunately, this pull 
request cause a problem with maven shading plugin 
(https://issues.apache.org/jira/browse/MSHADE-200). Due to the problem, Flink 
community decided to use version string instead of property and add to a shell 
script (tools/change-scala-version.sh) to convert Scala version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-29 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76580310
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
--- End diff --

Ah I see. `Block.zero` has only few costs. Sorry for inconvenience.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76552087
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, 
values: Vector) extends Ordered[
 
   override def toString: String = s"($rowIndex, ${values.toString})"
 }
+
+/**
+  * Serializable Reduction function used by the toBlockMatrix function. 
Takes an ordered list of
+  * indexed row and split those rows to form blocks.
+  */
+class RowGroupReducer(blockMapper: BlockMapper)
+extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
+
+  override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
+  out: Collector[(Int, Block)]): Unit = {
+
+val sortedRows = values.toList.sortBy(_._2)
+val blockID = sortedRows.head._1
+val coo = for {
+  (_, rowIndex, vec) <- sortedRows
+  (colIndex, value) <- vec if value != 0
+} yield (rowIndex, colIndex, value)
+
+val block: Block = Block(
+SparseMatrix.fromCOO(
+blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
+out.collect((blockID, block))
+  }
+}
+
+class RowSplitter(blockMapper: BlockMapper)
+extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
+  override def flatMap(
+  row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
+val IndexedRow(rowIndex, vector) = row
+val splitRow = sliceVector(vector)
+for ((mappedCol, slice) <- splitRow) {
+  val mappedRow =
+math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
+  val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, 
mappedCol)
+  out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
+}
+  }
+
+  def sliceVector(v: Vector): List[(Int, Vector)] = {
+
+def getSliceByColIndex(index: Int): Int =
--- End diff --

This method is used once. Please remove this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551557
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
--- End diff --

Zero-filled matrix can be re-used. Please create the matrix once using 
`RichFlatJoinFunction` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551568
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
+}
+
+val (id2, block2) = Option(right) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(left._1, Block.zero(left._2.getRows, left._2.getCols))
--- End diff --

Re-use zero-filled matrix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551191
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
--- End diff --

Please avoid naming convention starts with `get/set`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551144
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
--- End diff --

Could you add a message for understanding what is the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551077
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
+} else {
+  val mappedRow = i / rowsPerBlock
+  val mappedColumn = j / colsPerBlock
+  val res = mappedRow * numBlockCols + mappedColumn
+
+  assert(res <= numBlocks)
+  res
+}
+  }
+
+  /**
+* Retrieves mapped coordinates for a given block.
+* @param blockId
+* @return
+*/
+  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
+if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
+  throw new IllegalArgumentException(
+  s"BlockId numeration starts from 0. $blockId is not a valid Id"
+  )
--- End diff --

Please use `require` function like above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551066
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
--- End diff --

Following block with `require` would be better:

```scala
require(0 <= i && i < numRows && 0 <= j && j < numCols, s"Invalid 
coordinates ($i, $j).")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551085
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
+} else {
+  val mappedRow = i / rowsPerBlock
+  val mappedColumn = j / colsPerBlock
+  val res = mappedRow * numBlockCols + mappedColumn
+
+  assert(res <= numBlocks)
+  res
+}
+  }
+
+  /**
+* Retrieves mapped coordinates for a given block.
+* @param blockId
+* @return
+*/
+  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
+if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
+  throw new IllegalArgumentException(
+  s"BlockId numeration starts from 0. $blockId is not a valid Id"
+  )
+} else {
+  val i = blockId / numBlockCols
+  val j = blockId % numBlockCols
+  (i, j)
+}
+  }
+
+  /**
+* Retrieves the ID of the block at the given coordinates
+* @param i
+* @param j
+* @return
+*/
+  def getBlockIdByMappedCoord(i: Int, j: Int): Int = {
+
+if (i < 0 || j < 0 || i >= numBlockRows || j >= numBlockCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
--- End diff --

Please use `require`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550871
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
--- End diff --

How about changing type of `Block` to case class? It would be better than 
class with `apply` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550821
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
+
+  var blockData: FlinkMatrix = null
+
+  def setBlockData(flinkMatrix: FlinkMatrix) = blockData = flinkMatrix
+
+  def getBlockData = blockData
+
+  def toBreeze = blockData.asBreeze
+
+  def getCols = blockData.numCols
+
+  def getRows = blockData.numRows
--- End diff --

get/set naming convention is not suitable for Scala. We need to change 
`getCols` and `getRows` to `numCols` and `numRows`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550602
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
+
+  var blockData: FlinkMatrix = null
--- End diff --

Does we need to set block data as mutable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

2016-08-25 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2152
  
Sorry for long no response. Could you give me few days more? I'll review in 
weekend. Sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-29 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68891682
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/distributed/BlockMatrixSuite.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.SparseMatrix
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, GivenWhenThen, Matchers}
+
+class BlockMatrixSuite
+extends FlatSpec
+with Matchers
+with GivenWhenThen
+with FlinkTestBase {
+
+  val env = ExecutionEnvironment.getExecutionEnvironment
--- End diff --

I found Travis-CI is failed because of this line. We cannot reuse 
`ExecutionEnvironment` due to order of class initialization. It seems related 
to FLINK-3994. Please move initialization of `ExecutionEnvironment` and 
`DistributedRowMatrix` into each test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

2016-06-28 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2152
  
This PR contains unnecessary code changes related to `DistributedRowMatrix` 
(`DistributedRowMatrix.scala` and `DistributedRowMatrixSuite.scala`). Please 
sync this PR with current master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68885212
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
--- End diff --

This line still imports `java.lang`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-27 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68696822
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val getDataset = data
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows &&
+this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left).getOrElse(
+(right._1, Block.zero(right._2.getRows, right._2.getCols)))
+
+val (id2, block2) = Option(right).getOrElse(
+(left._1, Block.zero(left._2.getRows, left._2.getCols)))
+
+require(id1 == id2)
+(id1, fun(block1, block2))
+  }
+  }
+new BlockMatrix(processedBlocks, blockMapper)
+  }
+
+  /**
+* Add the matrix to another matrix.
+* @param other
+* @return
+*/
+  def sum(other: BlockMatrix): BlockMatrix = {
--- End diff --

We agreed that `add` is more proper name for this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-27 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68696809
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val getDataset = data
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows &&
+this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left).getOrElse(
+(right._1, Block.zero(right._2.getRows, right._2.getCols)))
+
+val (id2, block2) = Option(right).getOrElse(
+(left._1, Block.zero(left._2.getRows, left._2.getCols)))
+
+require(id1 == id2)
--- End diff --

Is there a case of `id1 != id2` if `hasSameFormat(other)` is true? It seems 
unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-27 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68696680
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val getDataset = data
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows &&
+this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left).getOrElse(
+(right._1, Block.zero(right._2.getRows, right._2.getCols)))
--- End diff --

We need to change this like following to reduce creation of `Block` object:
```scala
val (id1, block1) = Option(left) match {
  case Some((id, block)) => (id, block)
  case None => (right._1, Block.zero(right._2.getRows, right._2.getCols)))
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-27 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68696692
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val getDataset = data
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows &&
+this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left).getOrElse(
+(right._1, Block.zero(right._2.getRows, right._2.getCols)))
+
+val (id2, block2) = Option(right).getOrElse(
+(left._1, Block.zero(left._2.getRows, left._2.getCols)))
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-27 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68696490
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+data: DataSet[(BlockID, Block)],
--- End diff --

I think that changing visibility to public (`val`) and removing 
`getDataset` method would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-27 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r68696421
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
--- End diff --

Does we need this import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

2016-06-24 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2152
  
Thanks for opening pull request @chobeat! I would like to shepherd this PR. 
I'll review in weekend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1985: [FLINK-1979] Add logistic loss, hinge loss and regulariza...

2016-06-22 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/1985
  
Hi @skavulya, sorry for late response. I've checked the updated PR and 
looks good to me. I wonder whether the name `RegularizationPenalty` is proper 
because the class calculates lots of values. But it is trivial thing.

If there is no objection in few days, I'll merge this. Thanks! 👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-ba...

2016-06-22 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/1996
  
Looks good to me, +1. I'll merge this in few hours.

But I think we should change the type of row index to `Long` in near 
future. I think we can deal with the incompatibility problem with local 
matrices by assuming that the number of rows is less than `Int.MaxValue` and 
converting row index type from `Long` to `Int` in collecting time. I'll submit 
an issue related to this to JIRA after merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra:...

2016-06-22 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r68041009
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.distributed.DistributedMatrix._
+import org.apache.flink.ml.math._
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRows Number of rows.
+  * @param numCols Number of columns.
+  */
+class DistributedRowMatrix(val data: DataSet[IndexedRow],
+   val numRows: Int,
+   val numCols: Int)
+extends DistributedMatrix {
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO)
+require(localMatrix.numRows == this.numRows)
+require(localMatrix.numCols == this.numCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.data
+require(this.numCols == other.numCols)
+require(this.numRows == other.numRows)
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0
+}
+val row2 = Option(right) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 0.0
+}
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRows, numCols)
+  }
+
+  /**
+* Add the matrix to another matrix.
+* @param other
+* @return
+*/
+  def sum(other: DistributedRowMatrix): DistributedRowMatrix = {
--- End diff --

It is not your fault. :-) It seems my fault. Sorry. Let's merge this to 
master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2076: [FLINK-2832] [tests] Hardens RandomSamplerTest

2016-06-06 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2076
  
Failing Travis is not related to this PR. Looks good to me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2076: [FLINK-2832] [tests] Hardens RandomSamplerTest

2016-06-06 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2076#discussion_r6588
  
--- Diff: flink-java/src/test/resources/logback-test.xml ---
@@ -0,0 +1,34 @@
+
+
+
+
+
+%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
--- End diff --

we need to add new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2076: [FLINK-2832] [tests] Hardens RandomSamplerTest

2016-06-06 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2076#discussion_r6573
  
--- Diff: flink-java/src/test/resources/log4j-test.properties ---
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
--- End diff --

Spaces around `=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra:...

2016-06-03 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65688309
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.distributed.DistributedMatrix._
+import org.apache.flink.ml.math._
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRows Number of rows.
+  * @param numCols Number of columns.
+  */
+class DistributedRowMatrix(val data: DataSet[IndexedRow],
+   val numRows: Int,
+   val numCols: Int)
+extends DistributedMatrix {
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO)
+require(localMatrix.numRows == this.numRows)
+require(localMatrix.numCols == this.numCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.data
+require(this.numCols == other.numCols)
+require(this.numRows == other.numRows)
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0
+}
+val row2 = Option(right) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 0.0
+}
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRows, numCols)
+  }
+
+  /**
+* Add the matrix to another matrix.
+* @param other
+* @return
+*/
+  def sum(other: DistributedRowMatrix): DistributedRowMatrix = {
--- End diff --

Is `add` more proper name for this method? Please do not update this PR if 
you agree with me but just notify me because I'm rebasing this PR on current 
master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1996: [FLINK-3919][flink-ml] Distributed Linear Algebra:...

2016-06-02 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65657170
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRows Number of rows.
+  * @param numCols Number of columns.
+  */
+class DistributedRowMatrix(val data: DataSet[IndexedRow],
+   val numRows: Int,
+   val numCols: Int )
+extends DistributedMatrix {
+
+
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO)
+require(localMatrix.numRows == this.numRows)
+require(localMatrix.numCols == this.numCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.data
+require(this.numCols == other.numCols)
+require(this.numRows == other.numRows)
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0
+}
+val row2 = Option(right) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 0.0
+}
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRows, numCols)
+  }
+
+  /**
+* Add the matrix to another matrix.
+* @param other
+* @return
+*/
+  def sum(other: DistributedRowMatrix): DistributedRowMatrix = {
+val sumFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) =>
+  (x.asBreeze + y.asBreeze).fromBreeze
+this.byRowOperation(sumFunction, other)
+  }
+
+  /**
+* Subtracts another matrix.
+* @param other
+* @return
+*/
+  def subtract(other: DistributedRowMatrix): DistributedRowMatrix = {
+val subFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) =>
+  

[GitHub] flink issue #1985: [FLINK-1979] Add logistic loss, hinge loss and regulariza...

2016-06-02 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/1985
  
Okay, please ping me when the PR is updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65355487
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRows Number of rows.
+  * @param numCols Number of columns.
+  */
+class DistributedRowMatrix(val data: DataSet[IndexedRow],
+   val numRows: Int,
+   val numCols: Int )
+extends DistributedMatrix {
+
+
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO)
+require(localMatrix.numRows == this.numRows)
+require(localMatrix.numCols == this.numCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.data
+require(this.numCols == other.numCols)
+require(this.numRows == other.numRows)
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0
+}
+val row2 = Option(right) match {
+  case Some(row: IndexedRow) => row
+  case None =>
+IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 0.0
+}
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRows, numCols)
+  }
+
+  /**
+* Add the matrix to another matrix.
+* @param other
+* @return
+*/
+  def sum(other: DistributedRowMatrix): DistributedRowMatrix = {
+val sumFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) =>
+  (x.asBreeze + y.asBreeze).fromBreeze
+this.byRowOperation(sumFunction, other)
+  }
+
+  /**
+* Subtracts another matrix.
+* @param other
+* @return
+*/
+  def subtract(other: DistributedRowMatrix): DistributedRowMatrix = {
+val subFunction: (Vector, Vector) => Vector = (x: Vector, y: Vector) =>
+  

[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1996
  
@chobeat Okay. Then we should force user to calculate dimensionality of 
matrix by changing type of number parameters in constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65311449
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
--- End diff --

It seems that `getRowData` is redundant. Changing `data` to public (by 
adding `val` keyword in previous `data`) would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1996
  
Hi @chobeat, thanks for update PR. After addressing comments on source 
code, I think the last thing to merge this is adding documentation for this. 
But you can add the documentation after block-based matrix is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65309988
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => numRows.collect().head
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => numCols.collect().head
+  }
+
+  lazy val numRows: DataSet[Int] = numRowsOpt match {
+case Some(rows) => data.getExecutionEnvironment.fromElements(rows)
+case None => data.max("rowIndex").map(_.rowIndex + 1)
+  }
+
+  lazy val numCols: DataSet[Int] = numColsOpt match {
+case Some(cols) => data.getExecutionEnvironment.fromElements(cols)
+case None => data.first(1).map(_.values.size)
+  }
+
+  val getRowData = data
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO)
+require(localMatrix.numRows == this.getNumRows)
+require(localMatrix.numCols == this.getNumCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.getRowData
+require(this.getNumCols == other.getNumCols)
+require(this.getNumRows == other.getNumRows)
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left).getOrElse(IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0)
+val row2 = Option(right).getOrElse(IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 
0.0)
--- End diff --

I would like to rewrite this block like following to avoid create 
unnecessary `IndexedRow` object:

```scala
val row1 = Option(left) match {
  cas

[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65309186
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => numRows.collect().head
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => numCols.collect().head
+  }
+
+  lazy val numRows: DataSet[Int] = numRowsOpt match {
+case Some(rows) => data.getExecutionEnvironment.fromElements(rows)
+case None => data.max("rowIndex").map(_.rowIndex + 1)
+  }
+
+  lazy val numCols: DataSet[Int] = numColsOpt match {
+case Some(cols) => data.getExecutionEnvironment.fromElements(cols)
+case None => data.first(1).map(_.values.size)
+  }
+
+  val getRowData = data
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO)
+require(localMatrix.numRows == this.getNumRows)
+require(localMatrix.numCols == this.getNumCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.getRowData
+require(this.getNumCols == other.getNumCols)
+require(this.getNumRows == other.getNumRows)
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left).getOrElse(IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0)
+val row2 = Option(right).getOrElse(IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 
0.0)
+
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRowsOpt, numColsOpt)
+  }
+

[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...

2016-06-01 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65309134
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
+
+  val getRowData = data
+
+  private def calcCols: Int =
+data.first(1).collect().headOption match {
+  case Some(vector) => vector.values.size
+  case None => 0
+}
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO)
+require(localMatrix.numRows == this.getNumRows)
+require(localMatrix.numCols == this.getNumCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.getRowData
+require(this.getNumCols == other.getNumCols)
+require(this.getNumRows == other.getNumRows)
+
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left).getOrElse(IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0)
+val row2 = Option(right).getOrElse(IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 
0.0)
+
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRowsOpt, numColsOpt)
+  }
+
+  /**
+

[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1985
  
Hi @skavulya, I just quickly reviewed your updated PR and left few 
comments. They are not critical things but It would be better to fix them. 
Other things are very good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291933
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyTest.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class RegularizationPenaltyTest extends FlatSpec with Matchers with 
FlinkTestBase {
--- End diff --

Same as above, we don't need to extend `FlinkTestBase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291911
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.{LabeledVector, WeightVector}
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.test.util.FlinkTestBase
+
+
+class LossFunctionTest extends FlatSpec with Matchers with FlinkTestBase {
--- End diff --

We don't need to extend `FlinkTestBase` because `LossFunctionTest` do not 
use Flink cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291552
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.{Vector, BLAS}
+import org.apache.flink.ml.math.Breeze._
+import breeze.linalg.{norm => BreezeNorm}
+
+/** Represents a type of regularization penalty
+  *
+  * Regularization penalties are used to restrict the optimization problem 
to solutions with
+  * certain desirable characteristics, such as sparsity for the L1 
penalty, or penalizing large
+  * weights for the L2 penalty.
+  *
+  * The regularization term, `R(w)` is added to the objective function, 
`f(w) = L(w) + lambda*R(w)`
+  * where lambda is the regularization parameter used to tune the amount 
of regularization applied.
+  */
+trait RegularizationPenalty extends Serializable {
+
+  /** Calculates the new weights based on the gradient and regularization 
penalty
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient used to update the weights
+* @param regularizationConstant The regularization parameter to be 
applied 
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector
+
+  /** Adds regularization to the loss value
+*
+* @param oldLoss The loss to be updated
+* @param weightVector The gradient used to update the loss
+* @param regularizationConstant The regularization parameter to be 
applied
+* @return Updated loss
+*/
+  def regLoss(oldLoss: Double, weightVector: Vector, 
regularizationConstant: Double): Double
+
+}
+
+
+/** `L_2` regularization penalty.
+  *
+  * The regularization function is the square of the L2 norm 
`1/2*||w||_2^2`
+  * with `w` being the weight vector. The function penalizes large weights,
+  * favoring solutions with more small weights rather than few large ones.
+  */
+object L2Regularization extends RegularizationPenalty {
+
+  /** Calculates the new weights based on the gradient and L2 
regularization penalty
+*
+* The updated weight is `w - learningRate *(gradient + lambda * w)` 
where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter.
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient according to which we will update the 
weights
+* @param regularizationConstant The regularization parameter to be 
applied
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  override def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector = {
+// add the gradient of the L2 regularization
+BLAS.axpy(regularizationConstant, weightVector, gradient)
+
+// update the weights according to the learning rate
+BLAS.axpy(-learningRate, gradient, weightVector)
+
+weightVector
+  }
+
+  /** Adds regularization to the loss value
+*
+* The updated loss is `oldLoss + lambda * 1/2*||w||_2^2` where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter
+*
+* @param oldLoss The loss to be updated
+* @param weightVector The gradient used to update the loss
+* @param regularizationConstant The regularization parameter to be 
applied
+* @return Updated loss
+*/

[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291461
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 ---
@@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction {
 
   /** Calculates the loss depending on the label and the prediction
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
 */
   override def loss(prediction: Double, label: Double): Double = {
 0.5 * (prediction - label) * (prediction - label)
   }
 
   /** Calculates the derivative of the [[PartialLossFunction]]
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
 */
   override def derivative(prediction: Double, label: Double): Double = {
 (prediction - label)
   }
 }
+
+/** Logistic loss function which can be used with the 
[[GenericLossFunction]]
+  *
+  *
+  * The [[LogisticLoss]] function implements `log(1 + 
-exp(prediction*label))`
+  * for binary classification with label in {-1, 1}
+  */
+object LogisticLoss extends PartialLossFunction {
+
+  /** Calculates the loss depending on the label and the prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
+*/
+  override def loss(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return math.exp(-z)
+}
+else if (z < -18) {
+  return -z
+}
+
+math.log(1 + math.exp(-z))
+  }
+
+  /** Calculates the derivative of the loss function with respect to the 
prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
+*/
+  override def derivative(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return -label * math.exp(-z)
+}
+else if (z < -18) {
+  return -label
+}
+
+-label/(math.exp(z) + 1)
+  }
--- End diff --

As I said above, following is better:

```scala
if (z > 18) {
  -label * math.exp(-z)
} else if (z < -18) {
  -label
} else {
  -label / (math.exp(z) + 1)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291353
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 ---
@@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction {
 
   /** Calculates the loss depending on the label and the prediction
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
 */
   override def loss(prediction: Double, label: Double): Double = {
 0.5 * (prediction - label) * (prediction - label)
   }
 
   /** Calculates the derivative of the [[PartialLossFunction]]
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
 */
   override def derivative(prediction: Double, label: Double): Double = {
 (prediction - label)
   }
 }
+
+/** Logistic loss function which can be used with the 
[[GenericLossFunction]]
+  *
+  *
+  * The [[LogisticLoss]] function implements `log(1 + 
-exp(prediction*label))`
+  * for binary classification with label in {-1, 1}
+  */
+object LogisticLoss extends PartialLossFunction {
+
+  /** Calculates the loss depending on the label and the prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
+*/
+  override def loss(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return math.exp(-z)
+}
+else if (z < -18) {
+  return -z
+}
+
+math.log(1 + math.exp(-z))
--- End diff --

Using `return` is not recommended in Scala. Could you change this like 
following?

```scala
if (z > 18) {
  math.exp(-z)
} else if (z < -18) {
  -z
} else {
  math.log(1 + math.exp(-z))
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Thanks for clarifying @StephanEwen! I'll merge this after Travis succeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Thanks for guide @mxm. I'll set upper limit for the parallelism to 4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread chiwanpark
GitHub user chiwanpark opened a pull request:

https://github.com/apache/flink/pull/2056

[FLINK-3994] [ml, tests] Fix flaky KNN integration tests

This PR is related to flaky KNN integration tests. The problem is caused by 
sharing `ExecutionEnvironment` between test cases. I'm not sure about exact 
reason. This PR makes each test case have own `ExecutionEnvironment`. Tests on 
my local machine and my Travis-CI [1] is passed with this PR.

I have some doubt because this is not essential fix for the problem. AFAIK 
and @StephanEwen said, sharing `ExecutionEnvironment` should be supported. 
Addtionally, `mvn clean verify` has passed without this PR on my local machine.

If there are any other opinions, please leave comment.

[1]: https://travis-ci.org/chiwanpark/flink/builds/134104491

p.s. we need to re-write commit message.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chiwanpark/flink hotfix-ml-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2056


commit a47ae8481bcbac2c490386089ee6b1e740f3a1f4
Author: Chiwan Park <chiwanp...@apache.org>
Date:   2016-05-31T08:50:05Z

[hotfix] [ml] Fix flaky KNN integration tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Added addition, subtraction and multiply by sc...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2052#issuecomment-222644723
  
Hi @danielblazevski, thanks for opening pull request. But we need a related 
JIRA issue for this. Could you create an issue and change title of this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65145402
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
--- End diff --

Yes, you are right. Thanks for correction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2131][ml]: Initialization schemes for k...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/757#issuecomment-222599628
  
Any updates on this? If there is no update, I would like to push additional 
effort based on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2673] [type system] Add a comparator fo...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2017#issuecomment-222577020
  
Thanks for review @StephanEwen! Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65102583
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed row-major matrix representation.
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
+
+  val getRowData = data
+
+  private def calcCols: Int =
+data.first(1).collect().headOption match {
+  case Some(vector) => vector.values.size
+  case None => 0
+}
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+ (columnIndex, value) <- vector) yield (rowIndex, columnIndex, 
value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix =
+  SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO)
+require(localMatrix.numRows == this.getNumRows)
+require(localMatrix.numCols == this.getNumCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  /**
+* Apply a high-order function to couple of rows
+* @param fun
+* @param other
+* @return
+*/
+  def byRowOperation(fun: (Vector, Vector) => Vector,
+ other: DistributedRowMatrix): DistributedRowMatrix = {
+val otherData = other.getRowData
+require(this.getNumCols == other.getNumCols)
+require(this.getNumRows == other.getNumRows)
+
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where("rowIndex")
+  .equalTo("rowIndex")(
+  (left: IndexedRow, right: IndexedRow) => {
+val row1 = Option(left).getOrElse(IndexedRow(
+right.rowIndex,
+SparseVector.fromCOO(right.values.size, List((0, 
0.0)
+val row2 = Option(right).getOrElse(IndexedRow(
+left.rowIndex,
+SparseVector.fromCOO(left.values.size, List((0, 
0.0)
+
+IndexedRow(row1.rowIndex, fun(row1.values, row2.values))
+  }
+  )
+new DistributedRowMatrix(result, numRowsOpt, numColsOpt)
+  }
+
+  /**
+

[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65102444
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
--- End diff --

Ah. okay. Let's use `Int` for indices now.

About counting, we can re-use `DataSetUtils.countElementsPerPartition` 
method. 
(https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala#L56)

```scala
import org.apache.flink.scala.utils._

...

lazy val numRowsInDataSet = numRowsOpt match {
  case Some(value) => data.getExecutionEnvironment.fromElements(value)
  case None => data.countElementsPerPartition.map(_._2).reduce(_ + 
_).map(_.toInt)
}

...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65076170
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
--- End diff --

I think it is okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1996#issuecomment-222488228
  
There are some public methods (`sum`, `subtract`, `byRowOperation`, etc.) 
without scaladoc. Please add scaladoc for the methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65067628
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
--- End diff --

I meant the type of **row** number. Does we use row number as indices?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65066995
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
--- End diff --

I simplify my comments. Sorry for confusing. Type of number of columns 
could be `Int` only but we should provide the number of rows in both methods 
(DataSet and raw number).

By the way, isn't `Long` reasonable for type of number of rows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-222482793
  
@danielblazevski I'm using a custom configuration of IntelliJ code 
formatter. There is a pending [pull 
request](https://github.com/apache/flink/pull/1963/files) about code formatting 
in IntelliJ. This might be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65064961
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
+
+  val getRowData = data
+
+  private def calcCols: Int =
+data.first(1).collect().headOption match {
+  case Some(vector) => vector.values.size
+  case None => 0
+}
+
+  /**
+* Collects the data in the form of a sequence of coordinates 
associated with their values.
+* @return
+*/
+  def toCOO: Seq[(Int, Int, Double)] = {
+
+val localRows = data.collect()
+
+for (IndexedRow(rowIndex, vector) <- localRows;
+(columnIndex, value) <- vector) yield (rowIndex, columnIndex, value)
+  }
+
+  /**
+* Collects the data in the form of a SparseMatrix
+* @return
+*/
+  def toLocalSparseMatrix: SparseMatrix = {
+val localMatrix = SparseMatrix.fromCOO(this.getNumRows, 
this.getNumCols, this.toCOO)
+require(localMatrix.numRows == this.getNumRows)
+require(localMatrix.numCols == this.getNumCols)
+localMatrix
+  }
+
+  //TODO: convert to dense representation on the distributed matrix and 
collect it afterward
+  def toLocalDenseMatrix: DenseMatrix = 
this.toLocalSparseMatrix.toDenseMatrix
+
+  def byRowOperation(
+  fun: (Vector, Vector) => Vector, other: DistributedRowMatrix): 
DistributedRowMatrix = {
+val otherData = other.getRowData
+require(this.getNumCols == other.getNumCols)
+require(this.getNumRows == other.getNumRows)
+
+val ev1: TypeInformation[Int] = TypeInformation.of(classOf[Int])
+
+val result = this.data
+  .fullOuterJoin(otherData)
+  .where(_.rowIndex)
+  .equalTo(_.rowIndex)(ev1)(
--- End diff --

Key selector function creates copying keys overhead. please change this to 
like following:

```scala
val result = this.data
  .fullOuterJoin(otherData)
  .where("rowIndex")
  .equalTo("rowIndex") {
(left: IndexedRow, right: IndexedRow) => ...
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65063499
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
+  *
+  * @param numRowsOpt If None, will be calculated from the DataSet.
+  * @param numColsOpt If None, will be calculated from the DataSet.
+  */
+class DistributedRowMatrix(data: DataSet[IndexedRow],
+   numRowsOpt: Option[Int] = None,
+   numColsOpt: Option[Int] = None)
+extends DistributedMatrix {
+
+  lazy val getNumRows: Int = numRowsOpt match {
+case Some(rows) => rows
+case None => data.count().toInt
+  }
+
+  lazy val getNumCols: Int = numColsOpt match {
+case Some(cols) => cols
+case None => calcCols
+  }
--- End diff --

I still prefer numbers in `DataSet` but providing the numbers in both 
methods (in `DataSet[Int]` and `Int`) would be okay. Sometimes users don't know 
the size of matrix (such as generating the matrix from other data).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65060611
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import java.lang
+
+import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => 
BreezeMatrix, Vector => BreezeVector}
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _}
+import org.apache.flink.util.Collector
+import org.apache.flink.ml.math.Breeze._
+import scala.collection.JavaConversions._
+
+/**
--- End diff --

please scaladoc here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65060576
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+trait DistributedMatrix {
--- End diff --

We need scaladoc here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1996#discussion_r65060527
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedMatrix.scala
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+trait DistributedMatrix {
+
+  def getNumRows: Int
+  def getNumCols: Int
--- End diff --

How about changing the return type of `getNumRows` and `getNumCols` to 
`DataSet[Long]`? Returning number of elements directly in distributed data in 
Flink is expensive operation. We can use broadcast variable to transfer the 
numbers to future operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Alge...

2016-05-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1996#issuecomment-222473905
  
Hi @chobeat, thanks for opening pull request. I would like to shepherd this 
PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink 1934] Add approximative k-nearest-neigh...

2016-05-29 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2048#issuecomment-222397141
  
Hi @danielblazevski, thanks for opening pull request. I would like to split 
approximative knn and exact knn. If the approximative method needs exact knn 
implementation, could you wait until merging #1220? I'll merge it in few days 
(maybe today or tomorrow at latest).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2673] [type system] Add a comparator fo...

2016-05-22 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2017#issuecomment-220833561
  
Failing Travis is not related to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2673] [type system] Add a comparator fo...

2016-05-22 Thread chiwanpark
GitHub user chiwanpark opened a pull request:

https://github.com/apache/flink/pull/2017

[FLINK-2673] [type system] Add a comparator for Scala Option type

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

---

I've added `OptionTypeComparator` implementation, unit test cases based on 
`ComparatorTestBase` and a simple integration test case in `JoinITCase`. I 
followed [Till's opinion](http://markmail.org/message/vk3t3vpc6j6zbldk) about 
order of `None` and `Some` values.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chiwanpark/flink FLINK-2673

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2017.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2017


commit 485b6904013c23316a9f61d651b44e245e47c61c
Author: Chiwan Park <chiwanp...@apache.org>
Date:   2016-05-22T11:39:10Z

[FLINK-2673] [core] Add a comparator for Scala Option type




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-05-18 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r63815357
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k`-nearest neighbor points in the training set for 
each point in the test set.
+  *
+  * @example
+  * {{{
+  * val trainingDS: DataSet[Vector] = ...
+  * val testingDS: DataSet[Vector] = ...
+  *
+  * val knn = KNN()
+  *   .setK(10)
+  *   .setBlocks(5)
+  *   .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  * knn.fit(trainingDS)
+  *
+  * val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.nn.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''5''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]]
+  * Sets the distance metric we use to calculate the distance between two 
points. If no metric is
+  * specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+  * (Default value: '''EuclideanDistanceMetric()''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]]
+  * A boolean variable that whether or not to use a Quadtree to partition 
the training set
+  * to potentially simplify the KNN search.  If no value is specified, the 
code will
+  * automatically decide whether or not to use a Quadtree.  Use of a 
Quadtree scales well
+  * with the number of training and testing points, though poorly with the 
dimension.
+  * (Default value:  ```None```)
+  *
+  * - [[org.apache.flink.ml.nn.KNN.SizeHint]]
+  * Specifies whether the training set or test set is small to optimize 
the cross
+  * product operation needed for the KNN search.  If the training set is 
small
+  * this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL`
+  * if the test set is small.
+  * (Default value:  ```None```)
+  *
+  */
+
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[FlinkVector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k > 0, "K must be positive.")
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points

[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-05-18 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r63712972
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k`-nearest neighbor points in the training set for 
each point in the test set.
+  *
+  * @example
+  * {{{
+  *   val trainingDS: DataSet[Vector] = ...
+  *   val testingDS: DataSet[Vector] = ...
+  *
+  *   val knn = KNN()
+  * .setK(10)
+  * .setBlocks(5)
+  * .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  *   knn.fit(trainingDS)
+  *
+  *   val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.nn.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''5''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]]
+  * Sets the distance metric we use to calculate the distance between two 
points. If no metric is
+  * specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+  * (Default value: '''EuclideanDistanceMetric()''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]]
+  * A boolean variable that whether or not to use a Quadtree to partition 
the training set
+  * to potentially simplify the KNN search.  If no value is specified, the 
code will
+  * automatically decide whether or not to use a Quadtree.  Use of a 
Quadtree scales well
+  * with the number of training and testing points, though poorly with the 
dimension.
+  * (Default value:  ```None```)
+  *
+  * - [[org.apache.flink.ml.nn.KNN.SizeHint]]
+  * Specifies whether the training set or test set is small to optimize 
the cross
+  * product operation needed for the KNN search.  If the training set is 
small
+  * this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL`
+  * if the test set is small.
+  * (Default value:  ```None```)
+  *
+  */
+
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[FlinkVector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k > 0, "K must be positive.")
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/

[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-05-17 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-219702506
  
Great to hear that z-knn is almost done! If you think the implementation 
has good shape, do not hesitate to open a pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-05-14 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-219203934
  
I would like to merge this PR. I've checked the output of k-NN join (with, 
without quadtree). Documentation looks good.

To maintainers (@tillrohrmann @thvasilo): Could you do final check this PR? 
You do not need to bother code style issues. I'll address them before merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-215369160
  
@danielblazevski Sorry for late check. I check your PR and have few 
comments. After addressing, I would like to merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61398228
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/classification/Classification.scala
 ---
@@ -131,3 +131,6 @@ object Classification {
 
   val expectedWeightVector = DenseVector(-1.95, -3.45)
 }
+
+
+
--- End diff --

Are these new lines necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61398006
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/QuadTree.scala 
---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.ml.math.{Breeze, Vector}
+import Breeze._
+
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric,
+EuclideanDistanceMetric, DistanceMetric}
+
+import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.PriorityQueue
+
+/**
+ * n-dimensional QuadTree data structure; partitions
+ * spatial data for faster queries (e.g. KNN query)
+ * The skeleton of the data structure was initially
+ * based off of the 2D Quadtree found here:
+ * 
http://www.cs.trinity.edu/~mlewis/CSCI1321-F11/Code/src/util/Quadtree.scala
+ *
+ * Many additional methods were added to the class both for
+ * efficient KNN queries and generalizing to n-dim.
+ *
+ * @param minVec vector of the corner of the bounding box with smallest 
coordinates
+ * @param maxVec vector of the corner of the bounding box with smallest 
coordinates
+ * @param distMetric metric, must be Euclidean or squareEuclidean
+ * @param maxPerBox threshold for number of points in each box before 
slitting a box
+ */
+class QuadTree(
+  minVec: Vector,
+  maxVec: Vector,
+  distMetric: DistanceMetric,
+  maxPerBox: Int) {
+
+  class Node(
+center: Vector,
+width: Vector,
+var children: Seq[Node]) {
+
+val nodeElements = new ListBuffer[Vector]
+
+/** for testing purposes only; used in QuadTreeSuite.scala
+  *
+  * @return center and width of the box
+  */
+def getCenterWidth(): (Vector, Vector) = {
+  (center, width)
+}
+
+def contains(queryPoint: Vector): Boolean = {
--- End diff --

Could you add a scaladoc for this method? All public methods should have a 
scaladoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397870
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k` nearest neighbor points in the training set for 
each point in the test set.
+  *
+  * @example
+  * {{{
+  *   val trainingDS: DataSet[Vector] = ...
+  *   val testingDS: DataSet[Vector] = ...
+  *
+  *   val knn = KNN()
+  * .setK(10)
+  * .setBlocks(5)
+  * .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  *   knn.fit(trainingDS)
+  *
+  *   val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.nn.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''5''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]]
+  * Sets the distance metric we use to calculate the distance between two 
points. If no metric is
+  * specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+  * (Default value: '''EuclideanDistanceMetric()''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]]
+  * A boolean variable that whether or not to use a Quadtree to partition 
the training set
+  * to potentially simplify the KNN search.  If no value is specified, the 
code will
+  * automatically decide whether or not to use a Quadtree.  Use of a 
Quadtree scales well
+  * with the number of training and testing points, though poorly with the 
dimension.
+  * (Default value:  ```None```)
+  *
+  * - [[org.apache.flink.ml.nn.KNN.SizeHint]]
+  * Specifies whether the training set or test set is small to optimize 
the cross
+  * product operation needed for the KNN search.  If the training set is 
small
+  * this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL`
+  * if the test set is small.
+  * (Default value:  ```None```)
+  *
+  */
+
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[FlinkVector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k > 0, "K must be positive.")
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/

[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397754
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k` nearest neighbor points in the training set for 
each point in the test set.
+  *
+  * @example
+  * {{{
+  *   val trainingDS: DataSet[Vector] = ...
+  *   val testingDS: DataSet[Vector] = ...
+  *
+  *   val knn = KNN()
+  * .setK(10)
+  * .setBlocks(5)
+  * .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  *   knn.fit(trainingDS)
+  *
+  *   val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.nn.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''5''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]]
+  * Sets the distance metric we use to calculate the distance between two 
points. If no metric is
+  * specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+  * (Default value: '''EuclideanDistanceMetric()''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]]
+  * A boolean variable that whether or not to use a Quadtree to partition 
the training set
+  * to potentially simplify the KNN search.  If no value is specified, the 
code will
+  * automatically decide whether or not to use a Quadtree.  Use of a 
Quadtree scales well
+  * with the number of training and testing points, though poorly with the 
dimension.
+  * (Default value:  ```None```)
+  *
+  * - [[org.apache.flink.ml.nn.KNN.SizeHint]]
+  * Specifies whether the training set or test set is small to optimize 
the cross
+  * product operation needed for the KNN search.  If the training set is 
small
+  * this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL`
+  * if the test set is small.
+  * (Default value:  ```None```)
+  *
+  */
+
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[FlinkVector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k > 0, "K must be positive.")
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/

[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397659
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k` nearest neighbor points in the training set for 
each point in the test set.
--- End diff --

Calculates the _`k`-nearest_ neighbor points ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397574
  
--- Diff: docs/libs/ml/knn.md ---
@@ -0,0 +1,146 @@
+---
+mathjax: include
+htmlTitle: FlinkML - k-nearest neighbors
+title: FlinkML - knn
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+Implements an exact k-nearest neighbors algorithm.  Given a training set 
$A$ and a testing set $B$, the algorithm returns
+
+$$
+KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) 
are the k-nearest points to b in A \}
+$$
+
+The brute-force approach is to compute the distance between every training 
and testing point.  To ease the brute-force computation of computing the 
distance between every traning point a quadtree is used.  The quadtree scales 
well in the number of training points, though poorly in the spatial dimension.  
The algorithm will automatically choose whether or not to use the quadtree, 
though the user can override that decision by setting a parameter to force use 
or not use a quadtree. 
+
+##Operations
+
+`KNN` is a `Predictor`. 
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+KNN is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+KNN predicts for all subtypes of FlinkML's `Vector` the corresponding 
class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where 
the `(T, Array[Vector])` tuple
+  corresponds to (testPoint, K-nearest training points)
+
+## Paremeters
+The KNN implementation can be controlled by the following parameters:
+
+   
+
+  
+Parameters
+Description
+  
+
+
+
+  
+K
+
+  
+Defines the number of nearest-neoghbors to search for.  That 
is, for each test point, the algorithm finds the K nearest neighbors in the 
training set
+(Default value: 5)
+  
+
+  
+  
+ DistanceMetric
+
+  
+Sets the distance metric we use to calculate the distance 
between two points. If no metric is specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+(Default value:  EuclideanDistanceMetric )
+  
+
+  
+  
+Blocks
+
+  
+Sets the number of blocks into which the input data will be 
split. This number should be set
+at least to the degree of parallelism. If no value is 
specified, then the parallelism of the
+input [[DataSet]] is used as the number of blocks.
+(Default value: None)
+  
+
+  
+  
+UseQuadTreeParam
+
+  
+ A boolean variable that whether or not to use a Quadtree to 
partition the training set to potentially simplify the KNN search.  If no value 
is specified, the code will automatically decide whether or not to use a 
Quadtree.  Use of a Quadtree scales well with the number of training and 
testing points, though poorly with the dimension.
+(Default value: None)
+  
+
+  
+  
+SizeHint
+
+  Specifies whether the training set or test set is small to 
optimize the cross product operation needed for the KNN search.  If the 
training set is small this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL` if the test set is small.
+ (Default value: None)
+  
+
+  
+
+  
+
+## Examples
+
+{% highlight scala %}
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.classification.Classification
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.metrics.distances.
+SquaredEuclideanDistanceMetric
+
+  val env = ExecutionEnvironment.getExecutionEnvironment
+
+  // prepare data
+  val trainingSet = 
env.fromCollection(Classification.trainingData).map(_.vector)
+  val testingSet = env.fromElements(DenseVector(0.0, 0.0))
+
+ val knn = KNN()
+.setK(3)
+.setBlocks(10)
+.setDistanceMetric(SquaredEuclideanDistanceMetric())
+.setUseQuadTree(false)
+.setSizeHint(CrossHint.SECOND_IS_SMALL)
+
+  // run knn join
+  knn.fit(trainingSet)
+  val result = knn.predict(testingSet).collect()
+
+{% endhighlight %}
+
+For mor

[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397472
  
--- Diff: docs/libs/ml/knn.md ---
@@ -0,0 +1,146 @@
+---
+mathjax: include
+htmlTitle: FlinkML - k-nearest neighbors
+title: FlinkML - knn
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+Implements an exact k-nearest neighbors algorithm.  Given a training set 
$A$ and a testing set $B$, the algorithm returns
+
+$$
+KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) 
are the k-nearest points to b in A \}
+$$
+
+The brute-force approach is to compute the distance between every training 
and testing point.  To ease the brute-force computation of computing the 
distance between every traning point a quadtree is used.  The quadtree scales 
well in the number of training points, though poorly in the spatial dimension.  
The algorithm will automatically choose whether or not to use the quadtree, 
though the user can override that decision by setting a parameter to force use 
or not use a quadtree. 
+
+##Operations
+
+`KNN` is a `Predictor`. 
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+KNN is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+KNN predicts for all subtypes of FlinkML's `Vector` the corresponding 
class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where 
the `(T, Array[Vector])` tuple
+  corresponds to (testPoint, K-nearest training points)
+
+## Paremeters
+The KNN implementation can be controlled by the following parameters:
+
+   
+
+  
+Parameters
+Description
+  
+
+
+
+  
+K
+
+  
+Defines the number of nearest-neoghbors to search for.  That 
is, for each test point, the algorithm finds the K nearest neighbors in the 
training set
+(Default value: 5)
+  
+
+  
+  
+ DistanceMetric
+
+  
+Sets the distance metric we use to calculate the distance 
between two points. If no metric is specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+(Default value:  EuclideanDistanceMetric )
+  
+
+  
+  
+Blocks
+
+  
+Sets the number of blocks into which the input data will be 
split. This number should be set
+at least to the degree of parallelism. If no value is 
specified, then the parallelism of the
+input [[DataSet]] is used as the number of blocks.
+(Default value: None)
+  
+
+  
+  
+UseQuadTreeParam
+
+  
+ A boolean variable that whether or not to use a Quadtree to 
partition the training set to potentially simplify the KNN search.  If no value 
is specified, the code will automatically decide whether or not to use a 
Quadtree.  Use of a Quadtree scales well with the number of training and 
testing points, though poorly with the dimension.
+(Default value: None)
+  
+
+  
+  
+SizeHint
+
+  Specifies whether the training set or test set is small to 
optimize the cross product operation needed for the KNN search.  If the 
training set is small this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL` if the test set is small.
+ (Default value: None)
+  
+
+  
+
+  
+
+## Examples
+
+{% highlight scala %}
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.classification.Classification
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.metrics.distances.
+SquaredEuclideanDistanceMetric
--- End diff --

Could you move this line to end of previous line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397325
  
--- Diff: docs/libs/ml/knn.md ---
@@ -0,0 +1,146 @@
+---
+mathjax: include
+htmlTitle: FlinkML - k-nearest neighbors
+title: FlinkML - knn
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+Implements an exact k-nearest neighbors algorithm.  Given a training set 
$A$ and a testing set $B$, the algorithm returns
+
+$$
+KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) 
are the k-nearest points to b in A \}
+$$
+
+The brute-force approach is to compute the distance between every training 
and testing point.  To ease the brute-force computation of computing the 
distance between every traning point a quadtree is used.  The quadtree scales 
well in the number of training points, though poorly in the spatial dimension.  
The algorithm will automatically choose whether or not to use the quadtree, 
though the user can override that decision by setting a parameter to force use 
or not use a quadtree. 
+
+##Operations
+
+`KNN` is a `Predictor`. 
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+KNN is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+KNN predicts for all subtypes of FlinkML's `Vector` the corresponding 
class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where 
the `(T, Array[Vector])` tuple
+  corresponds to (testPoint, K-nearest training points)
+
+## Paremeters
+The KNN implementation can be controlled by the following parameters:
+
+   
+
+  
+Parameters
+Description
+  
+
+
+
+  
+K
+
+  
+Defines the number of nearest-neoghbors to search for.  That 
is, for each test point, the algorithm finds the K nearest neighbors in the 
training set
+(Default value: 5)
+  
+
+  
+  
+ DistanceMetric
+
+  
+Sets the distance metric we use to calculate the distance 
between two points. If no metric is specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+(Default value:  EuclideanDistanceMetric )
--- End diff --

Please remove space before EuclideanDistanceMetric and after it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397260
  
--- Diff: docs/libs/ml/knn.md ---
@@ -0,0 +1,146 @@
+---
+mathjax: include
+htmlTitle: FlinkML - k-nearest neighbors
+title: FlinkML - knn
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+Implements an exact k-nearest neighbors algorithm.  Given a training set 
$A$ and a testing set $B$, the algorithm returns
+
+$$
+KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) 
are the k-nearest points to b in A \}
+$$
+
+The brute-force approach is to compute the distance between every training 
and testing point.  To ease the brute-force computation of computing the 
distance between every traning point a quadtree is used.  The quadtree scales 
well in the number of training points, though poorly in the spatial dimension.  
The algorithm will automatically choose whether or not to use the quadtree, 
though the user can override that decision by setting a parameter to force use 
or not use a quadtree. 
+
+##Operations
+
+`KNN` is a `Predictor`. 
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+KNN is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+KNN predicts for all subtypes of FlinkML's `Vector` the corresponding 
class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where 
the `(T, Array[Vector])` tuple
+  corresponds to (testPoint, K-nearest training points)
+
+## Paremeters
+The KNN implementation can be controlled by the following parameters:
+
+   
+
+  
+Parameters
+Description
+  
+
+
+
+  
+K
+
+  
+Defines the number of nearest-neoghbors to search for.  That 
is, for each test point, the algorithm finds the K nearest neighbors in the 
training set
+(Default value: 5)
+  
+
+  
+  
+ DistanceMetric
--- End diff --

Please remove space before DistanceMetric


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397191
  
--- Diff: docs/libs/ml/knn.md ---
@@ -0,0 +1,146 @@
+---
+mathjax: include
+htmlTitle: FlinkML - k-nearest neighbors
+title: FlinkML - knn
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+Implements an exact k-nearest neighbors algorithm.  Given a training set 
$A$ and a testing set $B$, the algorithm returns
+
+$$
+KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) 
are the k-nearest points to b in A \}
+$$
+
+The brute-force approach is to compute the distance between every training 
and testing point.  To ease the brute-force computation of computing the 
distance between every traning point a quadtree is used.  The quadtree scales 
well in the number of training points, though poorly in the spatial dimension.  
The algorithm will automatically choose whether or not to use the quadtree, 
though the user can override that decision by setting a parameter to force use 
or not use a quadtree. 
+
+##Operations
+
+`KNN` is a `Predictor`. 
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+KNN is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+KNN predicts for all subtypes of FlinkML's `Vector` the corresponding 
class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where 
the `(T, Array[Vector])` tuple
+  corresponds to (testPoint, K-nearest training points)
+
+## Paremeters
+The KNN implementation can be controlled by the following parameters:
+
+   
+
+  
+Parameters
+Description
+  
+
+
+
+  
+K
+
+  
+Defines the number of nearest-neoghbors to search for.  That 
is, for each test point, the algorithm finds the K nearest neighbors in the 
training set
--- End diff --

the algorithm finds the _K-nearest_ neighbors ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r61397153
  
--- Diff: docs/libs/ml/knn.md ---
@@ -0,0 +1,146 @@
+---
+mathjax: include
+htmlTitle: FlinkML - k-nearest neighbors
+title: FlinkML - knn
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+Implements an exact k-nearest neighbors algorithm.  Given a training set 
$A$ and a testing set $B$, the algorithm returns
+
+$$
+KNN(A,B, k) = \{ \left( b, KNN(b,A) \right) where b \in B and KNN(b, A, k) 
are the k-nearest points to b in A \}
+$$
+
+The brute-force approach is to compute the distance between every training 
and testing point.  To ease the brute-force computation of computing the 
distance between every traning point a quadtree is used.  The quadtree scales 
well in the number of training points, though poorly in the spatial dimension.  
The algorithm will automatically choose whether or not to use the quadtree, 
though the user can override that decision by setting a parameter to force use 
or not use a quadtree. 
+
+##Operations
+
+`KNN` is a `Predictor`. 
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+KNN is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+KNN predicts for all subtypes of FlinkML's `Vector` the corresponding 
class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[(T, Array[Vector])]`, where 
the `(T, Array[Vector])` tuple
+  corresponds to (testPoint, K-nearest training points)
+
+## Paremeters
+The KNN implementation can be controlled by the following parameters:
+
+   
+
+  
+Parameters
+Description
+  
+
+
+
+  
+K
+
+  
+Defines the number of nearest-neoghbors to search for.  That 
is, for each test point, the algorithm finds the K nearest neighbors in the 
training set
--- End diff --

Defines the number of nearest-_neighbors_ ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-04-14 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-209854438
  
Hi @danielblazevski, sorry for late reply. I checked your updated PR but 
your last commit (d6f90ce) seems wrong. The commit removes KNN.scala, 
QuadTree.scala, KNNITSuite.scala, and QuadTreeSuite.scala. Could you check 
again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206350336
  
Ah, right @rmetzger. We can go ahead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206345431
  
I just found the author email is not same as @nikste's github account. It 
is trivial but fixing email address would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-06 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-206344388
  
Looks good to me. +1

I tested a simple streaming word count example with socket stream in 
following configuration:
* Flink on Hadoop YARN 2.7.2 (both Scala 2.10, 2.11)
* Flink local cluster (both Scala 2.10, 2.11)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-04-04 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-205274226
  
I prefer `senv` and `benv` because it is shorter than `streamEnv` and 
`batchEnv`. In shell, shorter variables would be better even though there is a 
auto-complete support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3678] Make Flink logs directory configu...

2016-03-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1837#issuecomment-203323834
  
I tested this PR in local cluster. Looks good to me. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix][docs] Fix a couple of typos in YARN d...

2016-03-27 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1836#issuecomment-202022859
  
Nice catch. I'll merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-03-24 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-200804302
  
Oh, thanks for pointing it @mxm. Then I'll merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-03-22 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-199796720
  
Hi all, I would like to merge this. If there is no objection and travis 
passes, I'll merge this to master in few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3645] [tests] HDFSCopyUtilitiesTest fai...

2016-03-22 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1825#issuecomment-199775815
  
Merged. Thanks for review @aljoscha!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3611] [docs] corrected link in CONTRIBU...

2016-03-22 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1786#issuecomment-199704337
  
+1, Looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3645] [tests] HDFSCopyUtilitiesTest fai...

2016-03-22 Thread chiwanpark
GitHub user chiwanpark opened a pull request:

https://github.com/apache/flink/pull/1825

[FLINK-3645] [tests] HDFSCopyUtilitiesTest fails in a Hadoop cluster

Please check [JIRA](https://issues.apache.org/jira/browse/FLINK-3645). 
`HDFSCopyUtilitiesTest` should use local file system only. This PR forces the 
test to use local file system.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chiwanpark/flink FLINK-3645

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1825.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1825


commit 019715e842c6e44256bda008bbd697287f681382
Author: Chiwan Park <chiwanp...@apache.org>
Date:   2016-03-22T08:44:35Z

[FLINK-3645] [tests] Force HDFSCOPYUtilitiesTest to use local file system




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-03-21 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-199648335
  
Hi @danielblazevski, thanks for update! Looks good to me for 
implementation. (Some minor issues and rebasing will be addressed by me.)

About docs, I meant we need to add description, examples and meaning of 
parameters to documentation in our homepage (`docs/libs/ml/knn.md`).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Update python.md

2016-03-21 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1814#issuecomment-199640748
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   >