[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444782#comment-15444782
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76552087
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, 
values: Vector) extends Ordered[
 
   override def toString: String = s"($rowIndex, ${values.toString})"
 }
+
+/**
+  * Serializable Reduction function used by the toBlockMatrix function. 
Takes an ordered list of
+  * indexed row and split those rows to form blocks.
+  */
+class RowGroupReducer(blockMapper: BlockMapper)
+extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
+
+  override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
+  out: Collector[(Int, Block)]): Unit = {
+
+val sortedRows = values.toList.sortBy(_._2)
+val blockID = sortedRows.head._1
+val coo = for {
+  (_, rowIndex, vec) <- sortedRows
+  (colIndex, value) <- vec if value != 0
+} yield (rowIndex, colIndex, value)
+
+val block: Block = Block(
+SparseMatrix.fromCOO(
+blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
+out.collect((blockID, block))
+  }
+}
+
+class RowSplitter(blockMapper: BlockMapper)
+extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
+  override def flatMap(
+  row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
+val IndexedRow(rowIndex, vector) = row
+val splitRow = sliceVector(vector)
+for ((mappedCol, slice) <- splitRow) {
+  val mappedRow =
+math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
+  val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, 
mappedCol)
+  out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
+}
+  }
+
+  def sliceVector(v: Vector): List[(Int, Vector)] = {
+
+def getSliceByColIndex(index: Int): Int =
--- End diff --

This method is used once. Please remove this method.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76552087
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
@@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, 
values: Vector) extends Ordered[
 
   override def toString: String = s"($rowIndex, ${values.toString})"
 }
+
+/**
+  * Serializable Reduction function used by the toBlockMatrix function. 
Takes an ordered list of
+  * indexed row and split those rows to form blocks.
+  */
+class RowGroupReducer(blockMapper: BlockMapper)
+extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
+
+  override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
+  out: Collector[(Int, Block)]): Unit = {
+
+val sortedRows = values.toList.sortBy(_._2)
+val blockID = sortedRows.head._1
+val coo = for {
+  (_, rowIndex, vec) <- sortedRows
+  (colIndex, value) <- vec if value != 0
+} yield (rowIndex, colIndex, value)
+
+val block: Block = Block(
+SparseMatrix.fromCOO(
+blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
+out.collect((blockID, block))
+  }
+}
+
+class RowSplitter(blockMapper: BlockMapper)
+extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
+  override def flatMap(
+  row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
+val IndexedRow(rowIndex, vector) = row
+val splitRow = sliceVector(vector)
+for ((mappedCol, slice) <- splitRow) {
+  val mappedRow =
+math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
+  val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, 
mappedCol)
+  out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
+}
+  }
+
+  def sliceVector(v: Vector): List[(Int, Vector)] = {
+
+def getSliceByColIndex(index: Int): Int =
--- End diff --

This method is used once. Please remove this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444761#comment-15444761
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551557
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
--- End diff --

Zero-filled matrix can be re-used. Please create the matrix once using 
`RichFlatJoinFunction` class.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444762#comment-15444762
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551568
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
+}
+
+val (id2, block2) = Option(right) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(left._1, Block.zero(left._2.getRows, left._2.getCols))
--- End diff --

Re-use zero-filled matrix.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551557
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
--- End diff --

Zero-filled matrix can be re-used. Please create the matrix once using 
`RichFlatJoinFunction` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551568
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
+
+/*Full outer join on blocks. The full outer join is required because of
+the sparse nature of the matrix.
+Matching blocks may be missing and a block of zeros is used instead.*/
+val processedBlocks =
+  this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
+(left: (BlockID, Block), right: (BlockID, Block)) =>
+  {
+
+val (id1, block1) = Option(left) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(right._1, Block.zero(right._2.getRows, right._2.getCols))
+}
+
+val (id2, block2) = Option(right) match {
+  case Some((id, block)) => (id, block)
+  case None =>
+(left._1, Block.zero(left._2.getRows, left._2.getCols))
--- End diff --

Re-use zero-filled matrix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444745#comment-15444745
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551191
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
--- End diff --

Please avoid naming convention starts with `get/set`


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551191
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
--- End diff --

Please avoid naming convention starts with `get/set`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444743#comment-15444743
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551144
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
--- End diff --

Could you add a message for understanding what is the problem?


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444737#comment-15444737
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551077
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
+} else {
+  val mappedRow = i / rowsPerBlock
+  val mappedColumn = j / colsPerBlock
+  val res = mappedRow * numBlockCols + mappedColumn
+
+  assert(res <= numBlocks)
+  res
+}
+  }
+
+  /**
+* Retrieves mapped coordinates for a given block.
+* @param blockId
+* @return
+*/
+  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
+if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
+  throw new IllegalArgumentException(
+  s"BlockId numeration starts from 0. $blockId is not a valid Id"
+  )
--- End diff --

Please use `require` function like above.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551144
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{createTypeInformation, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+  * Distributed Matrix represented as blocks. 
+  * A BlockMapper instance is used to track blocks of the matrix.
+  * Every block in a BlockMatrix has an associated ID that also 
+  * identifies its position in the BlockMatrix.
+  * @param data
+  * @param blockMapper
+  */
+class BlockMatrix(
+val data: DataSet[(BlockID, Block)],
+blockMapper: BlockMapper
+)
+extends DistributedMatrix {
+
+  val numCols = blockMapper.numCols
+  val numRows = blockMapper.numRows
+
+  val getBlockCols = blockMapper.numBlockCols
+  val getBlockRows = blockMapper.numBlockRows
+
+  val getRowsPerBlock = blockMapper.rowsPerBlock
+  val getColsPerBlock = blockMapper.colsPerBlock
+
+  val getNumBlocks = blockMapper.numBlocks
+
+  /**
+* Compares the format of two block matrices
+* @return
+*/
+  def hasSameFormat(other: BlockMatrix): Boolean =
+this.numRows == other.numRows && this.numCols == other.numCols &&
+this.getRowsPerBlock == other.getRowsPerBlock &&
+this.getColsPerBlock == other.getColsPerBlock
+
+  /**
+* Perform an operation on pairs of block. Pairs are formed taking
+* matching blocks from the two matrices that are placed in the same 
position.
+* A function is then applied to the pair to return a new block.
+* These blocks are then composed in a new block matrix.
+*/
+  def blockPairOperation(
+  fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
+require(hasSameFormat(other))
--- End diff --

Could you add a message for understanding what is the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444739#comment-15444739
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551085
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
+} else {
+  val mappedRow = i / rowsPerBlock
+  val mappedColumn = j / colsPerBlock
+  val res = mappedRow * numBlockCols + mappedColumn
+
+  assert(res <= numBlocks)
+  res
+}
+  }
+
+  /**
+* Retrieves mapped coordinates for a given block.
+* @param blockId
+* @return
+*/
+  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
+if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
+  throw new IllegalArgumentException(
+  s"BlockId numeration starts from 0. $blockId is not a valid Id"
+  )
+} else {
+  val i = blockId / numBlockCols
+  val j = blockId % numBlockCols
+  (i, j)
+}
+  }
+
+  /**
+* Retrieves the ID of the block at the given coordinates
+* @param i
+* @param j
+* @return
+*/
+  def getBlockIdByMappedCoord(i: Int, j: Int): Int = {
+
+if (i < 0 || j < 0 || i >= numBlockRows || j >= numBlockCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
--- End diff --

Please use `require`.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue 

[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444736#comment-15444736
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551066
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
--- End diff --

Following block with `require` would be better:

```scala
require(0 <= i && i < numRows && 0 <= j && j < numCols, s"Invalid 
coordinates ($i, $j).")
```


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551077
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
+} else {
+  val mappedRow = i / rowsPerBlock
+  val mappedColumn = j / colsPerBlock
+  val res = mappedRow * numBlockCols + mappedColumn
+
+  assert(res <= numBlocks)
+  res
+}
+  }
+
+  /**
+* Retrieves mapped coordinates for a given block.
+* @param blockId
+* @return
+*/
+  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
+if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
+  throw new IllegalArgumentException(
+  s"BlockId numeration starts from 0. $blockId is not a valid Id"
+  )
--- End diff --

Please use `require` function like above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551066
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
--- End diff --

Following block with `require` would be better:

```scala
require(0 <= i && i < numRows && 0 <= j && j < numCols, s"Invalid 
coordinates ($i, $j).")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76551085
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMapper.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import 
org.apache.flink.ml.math.distributed.DistributedMatrix.{MatrixColIndex, 
MatrixRowIndex}
+
+/**
+  * This class is in charge of handling all the spatial logic required by 
BlockMatrix.
+  * It introduces a new space of zero-indexed coordinates (i,j), called 
"mapped coordinates".
+  * This is a space where blocks are indexed (starting from zero).
+  *
+  * So every coordinate in the original space can be mapped to this space 
and to the
+  * corrisponding block.  A block have a row and a column coordinate that 
explicits
+  * its position. Every set of coordinates in the mapped space corresponds 
to a square
+  * of size rowPerBlock x colsPerBlock.
+  *
+  */
+case class BlockMapper( //original matrix size
+   numRows: Int,
+   numCols: Int,
+   //block size
+   rowsPerBlock: Int,
+   colsPerBlock: Int) {
+
+  require(numRows >= rowsPerBlock && numCols >= colsPerBlock)
+  val numBlockRows: Int = math.ceil(numRows * 1.0 / rowsPerBlock).toInt
+  val numBlockCols: Int = math.ceil(numCols * 1.0 / colsPerBlock).toInt
+  val numBlocks = numBlockCols * numBlockRows
+
+  /**
+* Translates absolute coordinates to the mapped coordinates of the 
block
+* these coordinates belong to.
+* @param i
+* @param j
+* @return
+*/
+  def absCoordToMappedCoord(i: MatrixRowIndex, j: MatrixColIndex): (Int, 
Int) =
+getBlockMappedCoordinates(getBlockIdByCoordinates(i, j))
+
+  /**
+* Retrieves a block id from original coordinates
+* @param i Original row
+* @param j Original column
+* @return Block ID
+*/
+  def getBlockIdByCoordinates(i: MatrixRowIndex, j: MatrixColIndex): Int = 
{
+
+if (i < 0 || j < 0 || i >= numRows || j >= numCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
+} else {
+  val mappedRow = i / rowsPerBlock
+  val mappedColumn = j / colsPerBlock
+  val res = mappedRow * numBlockCols + mappedColumn
+
+  assert(res <= numBlocks)
+  res
+}
+  }
+
+  /**
+* Retrieves mapped coordinates for a given block.
+* @param blockId
+* @return
+*/
+  def getBlockMappedCoordinates(blockId: Int): (Int, Int) = {
+if (blockId < 0 || blockId > numBlockCols * numBlockRows) {
+  throw new IllegalArgumentException(
+  s"BlockId numeration starts from 0. $blockId is not a valid Id"
+  )
+} else {
+  val i = blockId / numBlockCols
+  val j = blockId % numBlockCols
+  (i, j)
+}
+  }
+
+  /**
+* Retrieves the ID of the block at the given coordinates
+* @param i
+* @param j
+* @return
+*/
+  def getBlockIdByMappedCoord(i: Int, j: Int): Int = {
+
+if (i < 0 || j < 0 || i >= numBlockRows || j >= numBlockCols) {
+  throw new IllegalArgumentException(s"Invalid coordinates ($i,$j).")
--- End diff --

Please use `require`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444729#comment-15444729
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550871
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
--- End diff --

How about changing type of `Block` to case class? It would be better than 
class with `apply` method.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550871
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
--- End diff --

How about changing type of `Block` to case class? It would be better than 
class with `apply` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444726#comment-15444726
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550821
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
+
+  var blockData: FlinkMatrix = null
+
+  def setBlockData(flinkMatrix: FlinkMatrix) = blockData = flinkMatrix
+
+  def getBlockData = blockData
+
+  def toBreeze = blockData.asBreeze
+
+  def getCols = blockData.numCols
+
+  def getRows = blockData.numRows
--- End diff --

get/set naming convention is not suitable for Scala. We need to change 
`getCols` and `getRows` to `numCols` and `numRows`.


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550821
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
+
+  var blockData: FlinkMatrix = null
+
+  def setBlockData(flinkMatrix: FlinkMatrix) = blockData = flinkMatrix
+
+  def getBlockData = blockData
+
+  def toBreeze = blockData.asBreeze
+
+  def getCols = blockData.numCols
+
+  def getRows = blockData.numRows
--- End diff --

get/set naming convention is not suitable for Scala. We need to change 
`getCols` and `getRows` to `numCols` and `numRows`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444720#comment-15444720
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550602
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
+
+  var blockData: FlinkMatrix = null
--- End diff --

Does we need to set block data as mutable?


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-08-28 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/2152#discussion_r76550602
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/Block.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math.distributed
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Matrix => FlinkMatrix, SparseMatrix}
+
+class Block() {
+
+  var blockData: FlinkMatrix = null
--- End diff --

Does we need to set block data as mutable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4375) Introduce rpc protocols implemented by job manager

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1589#comment-1589
 ] 

ASF GitHub Bot commented on FLINK-4375:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/2375
  
discarding this pr, since we define the rpc interfaces in specific subtasks 
such as task executor registration, etc.


> Introduce rpc protocols implemented by job manager
> --
>
> Key: FLINK-4375
> URL: https://issues.apache.org/jira/browse/FLINK-4375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
>  job manager RPC server needs to  implement a job control protocol, resource 
> user protocol, task control protocol,
>   1. job controller: cancelJob, suspendJob, etc.
>   2. resource user: slotFailed(notify slot failure), 
> slotAvailable(offer slot), etc.
>   3. task controller: updateTaskState, updateResultPartitionInfo, 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4375) Introduce rpc protocols implemented by job manager

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1590#comment-1590
 ] 

ASF GitHub Bot commented on FLINK-4375:
---

Github user wenlong88 closed the pull request at:

https://github.com/apache/flink/pull/2375


> Introduce rpc protocols implemented by job manager
> --
>
> Key: FLINK-4375
> URL: https://issues.apache.org/jira/browse/FLINK-4375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
>  job manager RPC server needs to  implement a job control protocol, resource 
> user protocol, task control protocol,
>   1. job controller: cancelJob, suspendJob, etc.
>   2. resource user: slotFailed(notify slot failure), 
> slotAvailable(offer slot), etc.
>   3. task controller: updateTaskState, updateResultPartitionInfo, 
> etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2375: [FLINK-4375][cluster management]Introduce rpc protocols i...

2016-08-28 Thread wenlong88
Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/2375
  
discarding this pr, since we define the rpc interfaces in specific subtasks 
such as task executor registration, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2375: [FLINK-4375][cluster management]Introduce rpc prot...

2016-08-28 Thread wenlong88
Github user wenlong88 closed the pull request at:

https://github.com/apache/flink/pull/2375


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4519) scala maxLineLength increased to 120

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444399#comment-15444399
 ] 

ASF GitHub Bot commented on FLINK-4519:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2429
  
@rehevkor5 
https://builds.apache.org/job/flink-github-ci/274/org.apache.flink$flink-connector-kafka-0.8_2.10/testReport/org.apache.flink.streaming.connectors.kafka/Kafka08ITCase/org_apache_flink_streaming_connectors_kafka_Kafka08ITCase/

```
Error Message

Failed to bind to: /127.0.0.1:2624
Stacktrace

org.jboss.netty.channel.ChannelException: Failed to bind to: /127.0.0.1:2624
at 
org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at 
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at 
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at 
org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```


> scala maxLineLength increased to 120  
> --
>
> Key: FLINK-4519
> URL: https://issues.apache.org/jira/browse/FLINK-4519
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> `tools/maven/scalastyle-config.xml`



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2429: [FLINK-4519] scala maxLineLength increased to 120

2016-08-28 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2429
  
@rehevkor5 
https://builds.apache.org/job/flink-github-ci/274/org.apache.flink$flink-connector-kafka-0.8_2.10/testReport/org.apache.flink.streaming.connectors.kafka/Kafka08ITCase/org_apache_flink_streaming_connectors_kafka_Kafka08ITCase/

```
Error Message

Failed to bind to: /127.0.0.1:2624
Stacktrace

org.jboss.netty.channel.ChannelException: Failed to bind to: /127.0.0.1:2624
at 
org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at 
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at 
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at 
org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-08-28 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444180#comment-15444180
 ] 

Vasia Kalavri commented on FLINK-2254:
--

Hi [~ivan.mushketyk],

what you are describing in "we may consider that two actors are connected if 
they have a movie in common" is called a projection. That is, you take one of 
the modes of a bipartite graph and project it into a new graph. It would be 
nice to support projections for bipartite graphs, but maybe that's something we 
can do after we have basic support for bipartite graphs.

By basic support, I mean that we first need to provide a way to represent and 
define bipartite graphs, e.g. each vertex mode can be a separate dataset with 
different id and value types. I would start by thinking how it is best to 
represent bipartite graphs and sketch the API for creating them. Then, we 
should go over the existing graph methods and algorithms and see if/how they 
need to be adopted for bipartite graphs, e.g. what should {{getVertices()}} 
return?

Regarding the support of efficient operations, the idea is to exploit the 
knowledge there exist no edges between vertices of the same mode. This might 
let us perform some operations more efficiently, by e.g. filtering out edges / 
vertices, using certain join strategies, etc.

> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444046#comment-15444046
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
@mxm - The patch is available for your review. Please take a look.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15444044#comment-15444044
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
> 
YARNSessionFIFOSecuredITCase gives me the following:
17:49:58,097 INFO SecurityLogger.org.apache.hadoop.ipc.Server - Auth 
successful for appattempt_1471880990715_0001_01 (auth:SIMPLE)
It is not using Kerberos it seems. We should check that security is really 
enabled and fail the test if not.

@mxm I am not sure why the log statements from IPC layers are using 
auth:SIMPLE but I have verified the same messages (NM/RM logs) on a running HDP 
(secure) cluster too. I would imagine this is the default implementation and we 
can ignore those messages. However, while investigating this issue, I have 
found an interesting problem with YarnMiniCluster. The containers created does 
not have the Yarn Configuration that we pass through the test code. The KRB5 
file is also not visible and hence the UGI/security context that we create was 
missing proper Hadoop configurations. I have fixed the issue and patched it.

I have also disabled the RollingSinkSecure IT test case since secure MiniFS 
cluster requires privileged ports. We can enable the test case when the patch 
(HDFS-9213) is made in to main stream.

Please take a look and let me know if you can deploy and run the code.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-08-28 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
@mxm - The patch is available for your review. Please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-08-28 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
> 
YARNSessionFIFOSecuredITCase gives me the following:
17:49:58,097 INFO SecurityLogger.org.apache.hadoop.ipc.Server - Auth 
successful for appattempt_1471880990715_0001_01 (auth:SIMPLE)
It is not using Kerberos it seems. We should check that security is really 
enabled and fail the test if not.

@mxm I am not sure why the log statements from IPC layers are using 
auth:SIMPLE but I have verified the same messages (NM/RM logs) on a running HDP 
(secure) cluster too. I would imagine this is the default implementation and we 
can ignore those messages. However, while investigating this issue, I have 
found an interesting problem with YarnMiniCluster. The containers created does 
not have the Yarn Configuration that we pass through the test code. The KRB5 
file is also not visible and hence the UGI/security context that we create was 
missing proper Hadoop configurations. I have fixed the issue and patched it.

I have also disabled the RollingSinkSecure IT test case since secure MiniFS 
cluster requires privileged ports. We can enable the test case when the patch 
(HDFS-9213) is made in to main stream.

Please take a look and let me know if you can deploy and run the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443628#comment-15443628
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4194 at 8/28/16 3:39 PM:
-

I'll implement the proposed {{isEndOfShard()}} behaviour as an initial PR, so 
we can start discussing this from there.


was (Author: tzulitai):
I'll implement {{isEndOfShard()}} as an initial PR, so we can start discussing 
this from there.

> Implement isEndOfStream() for KinesisDeserializationSchema
> --
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never 
> called. The corresponding part of the code has been commented out with 
> reference to this JIRA.**
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4194:
---
Affects Version/s: (was: 1.1.0)

> Implement isEndOfStream() for KinesisDeserializationSchema
> --
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never 
> called. The corresponding part of the code has been commented out with 
> reference to this JIRA.**
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4194:
---
Fix Version/s: 1.2.0

> Implement isEndOfStream() for KinesisDeserializationSchema
> --
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never 
> called. The corresponding part of the code has been commented out with 
> reference to this JIRA.**
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443628#comment-15443628
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4194:


I'll implement {{isEndOfShard()}} as an initial PR, so we can start discussing 
this from there.

> Implement isEndOfStream() for KinesisDeserializationSchema
> --
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>
> **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never 
> called. The corresponding part of the code has been commented out with 
> reference to this JIRA.**
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4523:
---
Summary: Allow Kinesis Consumer to start from specific timestamp / Date  
(was: Allow Kinesis Consumer to start from specific timestamp)

> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4523:
---
Description: 
We had a Kinesis user requesting this feature on an offline chat.

To be specific, we let all initial Kinesis shards be iterated starting from 
records at the given timestamp.
The AWS Java SDK we're using already provides API for this, so we can add this 
functionality with fairly low overhead: 
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-

  was:
We had a Kinesis user requesting this feature on an offline chat.

To be specific, we let all initial Kinesis shards be iterated starting from 
records at the given timestamp.
The AWS Java SDK we're using already provides API for this, so we can add this 
functionality with fairly low overhead: 
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html


> Allow Kinesis Consumer to start from specific timestamp
> ---
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp

2016-08-28 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4523:
--

 Summary: Allow Kinesis Consumer to start from specific timestamp
 Key: FLINK-4523
 URL: https://issues.apache.org/jira/browse/FLINK-4523
 Project: Flink
  Issue Type: New Feature
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.2.0


We had a Kinesis user requesting this feature on an offline chat.

To be specific, we let all initial Kinesis shards be iterated starting from 
records at the given timestamp.
The AWS Java SDK we're using already provides API for this, so we can add this 
functionality with fairly low overhead: 
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3967) Provide RethinkDB Sink for Flink

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443457#comment-15443457
 ] 

ASF GitHub Bot commented on FLINK-3967:
---

Github user mans2singh commented on the issue:

https://github.com/apache/flink/pull/2031
  
Hi @rmetzger - 
Thanks for the update.  
Can you please let me know the steps for contributing to bahir ?  Can I 
just fork it and add another project to the root folder ?  
Thanks


> Provide RethinkDB Sink for Flink
> 
>
> Key: FLINK-3967
> URL: https://issues.apache.org/jira/browse/FLINK-3967
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 1.0.3
> Environment: All
>Reporter: Mans Singh
>Assignee: Mans Singh
>Priority: Minor
>  Labels: features
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Provide Sink to stream data from flink to rethink db.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2031: FLINK-3967 - Flink Sink for Rethink Db

2016-08-28 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/flink/pull/2031
  
Hi @rmetzger - 
Thanks for the update.  
Can you please let me know the steps for contributing to bahir ?  Can I 
just fork it and add another project to the root folder ?  
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-08-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443415#comment-15443415
 ] 

Josep Rubió commented on FLINK-1707:


Hi [~vkalavri],

Yeps, I'll do that. I thought you said you did not want it that way :(

Thanks!

> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4521) Fix "Submit new Job" panel in development mode

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443338#comment-15443338
 ] 

ASF GitHub Bot commented on FLINK-4521:
---

GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2431

[FLINK-4521] Fix "Submit new Job" panel in development mode

Submit panel was completely empty in the development mode. This happened 
because client-side code sent AJAX requests to a development HTTP server 
(`localhost:3000`) and not to the the JobServer (`localhost:8081`) and 
development server rejected all AJAX requests.

To fix the issue I did the following:

- Updated `server.js` to pass both GET and POST requests to the JobServer 
(`localhost:8081`)
- Changed all AJAX requests to use development HTTP server
- Added a helper function to build a correct URL for both dev and prod modes

Also, it seems that CoffeeScript and JavaScript code were out of sync in 
the `master`, so this PR contains a significant number of changes to `index.js`.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink fix-submit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2431


commit 8b4ac3453bdae08c9f59594dfaf529026fa7e1d5
Author: Ivan Mushketyk 
Date:   2016-08-28T11:04:12Z

[FLINK-4521]  Fix Submit panel in development mode

commit 406512b219738a721911f7709e532111c35e67fa
Author: Ivan Mushketyk 
Date:   2016-08-28T11:36:00Z

[FLINK-4521] Add utils.jobServerUrl helper method




> Fix "Submit new Job" panel in development mode
> --
>
> Key: FLINK-4521
> URL: https://issues.apache.org/jira/browse/FLINK-4521
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> If web frontend is started in the development mode, "Submit new Job" panel is 
> empty.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2431: [FLINK-4521] Fix "Submit new Job" panel in develop...

2016-08-28 Thread mushketyk
GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2431

[FLINK-4521] Fix "Submit new Job" panel in development mode

Submit panel was completely empty in the development mode. This happened 
because client-side code sent AJAX requests to a development HTTP server 
(`localhost:3000`) and not to the the JobServer (`localhost:8081`) and 
development server rejected all AJAX requests.

To fix the issue I did the following:

- Updated `server.js` to pass both GET and POST requests to the JobServer 
(`localhost:8081`)
- Changed all AJAX requests to use development HTTP server
- Added a helper function to build a correct URL for both dev and prod modes

Also, it seems that CoffeeScript and JavaScript code were out of sync in 
the `master`, so this PR contains a significant number of changes to `index.js`.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink fix-submit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2431


commit 8b4ac3453bdae08c9f59594dfaf529026fa7e1d5
Author: Ivan Mushketyk 
Date:   2016-08-28T11:04:12Z

[FLINK-4521]  Fix Submit panel in development mode

commit 406512b219738a721911f7709e532111c35e67fa
Author: Ivan Mushketyk 
Date:   2016-08-28T11:36:00Z

[FLINK-4521] Add utils.jobServerUrl helper method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-08-28 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443318#comment-15443318
 ] 

Vasia Kalavri commented on FLINK-1707:
--

Hi [~joseprupi],

thanks for the update. Regarding the initialization, I'm sorry that nobody 
replied to your email. Just a thought: couldn't you simply read the similarity 
matrix line-by-line and create the graph edges with their weights from it?

> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4522) Gelly link broken in homepage

2016-08-28 Thread Vasia Kalavri (JIRA)
Vasia Kalavri created FLINK-4522:


 Summary: Gelly link broken in homepage
 Key: FLINK-4522
 URL: https://issues.apache.org/jira/browse/FLINK-4522
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Gelly
Affects Versions: 1.1.0, 1.1.1
Reporter: Vasia Kalavri


The link to the Gelly documentation is broken in the Flink homepage. The link 
points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this 
link might be present in other places as well, e.g. slides, trainings, etc., we 
should re-direct to the new location of the Gelly docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4521) Fix "Submit new Job" panel in development mode

2016-08-28 Thread Ivan Mushketyk (JIRA)
Ivan Mushketyk created FLINK-4521:
-

 Summary: Fix "Submit new Job" panel in development mode
 Key: FLINK-4521
 URL: https://issues.apache.org/jira/browse/FLINK-4521
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Reporter: Ivan Mushketyk
Assignee: Ivan Mushketyk


If web frontend is started in the development mode, "Submit new Job" panel is 
empty.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly

2016-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15443011#comment-15443011
 ] 

ASF GitHub Bot commented on FLINK-4341:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2414
  
@rmetzger, @aljoscha the changes are ready for another review now, thanks!


> Kinesis connector does not emit maximum watermark properly
> --
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Scott Kidder
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.2.0, 1.1.2
>
>
> **Prevously reported as "Checkpoint state size grows unbounded when task 
> parallelism not uniform"**
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum 
> permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider 
> using a different state backend, like the File System State backend.
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>   at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>   at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>   ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I 
> suspect there was a regression introduced 

[GitHub] flink issue #2414: [FLINK-4341] Let idle consumer subtasks emit max value wa...

2016-08-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2414
  
@rmetzger, @aljoscha the changes are ready for another review now, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---