[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...

2015-01-26 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/344

[FLINK-1442] Reduce memory consumption of archived execution graph



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flink-1442

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #344


commit baa04e386b70d3c928ceb07e78e50016f20520f0
Author: Max m...@posteo.de
Date:   2015-01-26T18:31:47Z

[FLINK-1442] Reduce memory consumption of archived execution graph




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...

2015-02-04 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/344#discussion_r24093420
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
@@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
+import scala.collection.mutable.LinkedHashMap
+import scala.ref.SoftReference
+
 class MemoryArchivist(private val max_entries: Int) extends Actor with 
ActorLogMessages with
 ActorLogging {
   /**
* Map of execution graphs belonging to recently started jobs with the 
time stamp of the last
-   * received job event.
+   * received job event. The insert order is preserved through a 
LinkedHashMap.
*/
-  val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
-  val lru = collection.mutable.Queue[JobID]()
+  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
+/* Receive Execution Graph to archive */
 case ArchiveExecutionGraph(jobID, graph) = {
-  graphs.update(jobID, graph)
+  // wrap graph inside a soft reference
+  graphs.update(jobID, new SoftReference(graph))
+
+  // clear all execution edges of the graph
+  val iter = graph.getAllExecutionVertices().iterator()
+  while (iter.hasNext) {
+iter.next().clearExecutionEdges()
+  }
+
   cleanup(jobID)
 }
 
 case RequestArchivedJobs = {
-  sender ! ArchivedJobs(graphs.values)
+  sender ! ArchivedJobs(getAllGraphs())
 }
 
 case RequestJob(jobID) = {
-  graphs.get(jobID) match {
-case Some(graph) = sender ! JobFound(jobID, graph)
-case None = sender ! JobNotFound(jobID)
+  getGraph(jobID) match {
--- End diff --

Nice read. I'll try to avoid null the next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...

2015-02-04 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/344#discussion_r24093445
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
@@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
+import scala.collection.mutable.LinkedHashMap
+import scala.ref.SoftReference
+
 class MemoryArchivist(private val max_entries: Int) extends Actor with 
ActorLogMessages with
 ActorLogging {
   /**
* Map of execution graphs belonging to recently started jobs with the 
time stamp of the last
-   * received job event.
+   * received job event. The insert order is preserved through a 
LinkedHashMap.
*/
-  val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
-  val lru = collection.mutable.Queue[JobID]()
+  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
+/* Receive Execution Graph to archive */
 case ArchiveExecutionGraph(jobID, graph) = {
-  graphs.update(jobID, graph)
+  // wrap graph inside a soft reference
+  graphs.update(jobID, new SoftReference(graph))
+
+  // clear all execution edges of the graph
+  val iter = graph.getAllExecutionVertices().iterator()
+  while (iter.hasNext) {
+iter.next().clearExecutionEdges()
+  }
+
   cleanup(jobID)
 }
 
 case RequestArchivedJobs = {
-  sender ! ArchivedJobs(graphs.values)
+  sender ! ArchivedJobs(getAllGraphs())
 }
 
 case RequestJob(jobID) = {
-  graphs.get(jobID) match {
-case Some(graph) = sender ! JobFound(jobID, graph)
-case None = sender ! JobNotFound(jobID)
+  getGraph(jobID) match {
+case graph: ExecutionGraph = sender ! JobFound(jobID, graph)
+case _ = sender ! JobNotFound(jobID)
   }
 }
 
 case RequestJobStatus(jobID) = {
-  graphs.get(jobID) match {
-case Some(eg) = sender ! CurrentJobStatus(jobID, eg.getState)
-case None = sender ! JobNotFound(jobID)
+  getGraph(jobID) match {
+case graph: ExecutionGraph = sender ! CurrentJobStatus(jobID, 
graph.getState)
+case _ = sender ! JobNotFound(jobID)
   }
 }
   }
 
-  def cleanup(jobID: JobID): Unit = {
-if (!lru.contains(jobID)) {
-  lru.enqueue(jobID)
+  /**
+   * Gets all graphs that have not been garbage collected.
+   * @return An iterable with all valid ExecutionGraphs
+   */
+  def getAllGraphs() = graphs.values.flatMap(ref = ref.get match {
+case Some(graph) = Seq(graph)
+case _ = Seq()
+  })
--- End diff --

I wanted to ignore the null value results for garbage collected items in 
`graphs`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...

2015-02-04 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/344#discussion_r24093429
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
@@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
+import scala.collection.mutable.LinkedHashMap
+import scala.ref.SoftReference
+
 class MemoryArchivist(private val max_entries: Int) extends Actor with 
ActorLogMessages with
 ActorLogging {
   /**
* Map of execution graphs belonging to recently started jobs with the 
time stamp of the last
-   * received job event.
+   * received job event. The insert order is preserved through a 
LinkedHashMap.
*/
-  val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
-  val lru = collection.mutable.Queue[JobID]()
+  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
+/* Receive Execution Graph to archive */
 case ArchiveExecutionGraph(jobID, graph) = {
-  graphs.update(jobID, graph)
+  // wrap graph inside a soft reference
+  graphs.update(jobID, new SoftReference(graph))
+
+  // clear all execution edges of the graph
+  val iter = graph.getAllExecutionVertices().iterator()
+  while (iter.hasNext) {
+iter.next().clearExecutionEdges()
+  }
+
   cleanup(jobID)
 }
 
 case RequestArchivedJobs = {
-  sender ! ArchivedJobs(graphs.values)
+  sender ! ArchivedJobs(getAllGraphs())
 }
 
 case RequestJob(jobID) = {
-  graphs.get(jobID) match {
-case Some(graph) = sender ! JobFound(jobID, graph)
-case None = sender ! JobNotFound(jobID)
+  getGraph(jobID) match {
+case graph: ExecutionGraph = sender ! JobFound(jobID, graph)
+case _ = sender ! JobNotFound(jobID)
   }
 }
 
 case RequestJobStatus(jobID) = {
-  graphs.get(jobID) match {
-case Some(eg) = sender ! CurrentJobStatus(jobID, eg.getState)
-case None = sender ! JobNotFound(jobID)
+  getGraph(jobID) match {
+case graph: ExecutionGraph = sender ! CurrentJobStatus(jobID, 
graph.getState)
+case _ = sender ! JobNotFound(jobID)
   }
 }
   }
 
-  def cleanup(jobID: JobID): Unit = {
-if (!lru.contains(jobID)) {
-  lru.enqueue(jobID)
+  /**
+   * Gets all graphs that have not been garbage collected.
+   * @return An iterable with all valid ExecutionGraphs
+   */
+  def getAllGraphs() = graphs.values.flatMap(ref = ref.get match {
+case Some(graph) = Seq(graph)
+case _ = Seq()
+  })
+
+  /**
+   * Gets a graph with a jobID if it has not been garbage collected.
+   * @param jobID
+   * @return ExecutionGraph or null
+   */
+  def getGraph(jobID: JobID) = graphs.get(jobID) match {
+case Some(softRef) = softRef.get match {
+  case Some(graph) = graph
+  case None = null
 }
--- End diff --

Right you are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...

2015-02-04 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/344#discussion_r24093436
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
@@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
+import scala.collection.mutable.LinkedHashMap
+import scala.ref.SoftReference
+
 class MemoryArchivist(private val max_entries: Int) extends Actor with 
ActorLogMessages with
 ActorLogging {
   /**
* Map of execution graphs belonging to recently started jobs with the 
time stamp of the last
-   * received job event.
+   * received job event. The insert order is preserved through a 
LinkedHashMap.
*/
-  val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
-  val lru = collection.mutable.Queue[JobID]()
+  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
+/* Receive Execution Graph to archive */
 case ArchiveExecutionGraph(jobID, graph) = {
-  graphs.update(jobID, graph)
+  // wrap graph inside a soft reference
+  graphs.update(jobID, new SoftReference(graph))
+
+  // clear all execution edges of the graph
+  val iter = graph.getAllExecutionVertices().iterator()
+  while (iter.hasNext) {
+iter.next().clearExecutionEdges()
+  }
--- End diff --

Much nicer! Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-592] Add support for Kerberos secured Y...

2015-02-04 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/358#issuecomment-72870536
  
@warneke Thanks for reporting. If the above issues have been resolved, I 
suggest to merge the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-592] Add support for Kerberos secured Y...

2015-02-05 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/358#issuecomment-73013486
  
@warneke Thank you for your help!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1166] Add qa-check.sh tool

2015-02-05 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/366#issuecomment-73083755
  
The script builds Flink twice and therefore takes a while. I don't know if 
we can force users to execute it before making pull requests. However, it would 
be good to get an automatic report for open pull requests using this script.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1166] Add qa-check.sh tool

2015-02-05 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/366#discussion_r24178788
  
--- Diff: tools/qa-check.sh ---
@@ -0,0 +1,175 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+#
+# QA check your changes.
+# Possible options:
+# BRANCH set a another branch as the check reference
+#
+#
+# Use the tool like this BRANCH=release-0.8 ./tools/qa-check.sh
+#
+
+
+BRANCH=${BRANCH:-origin/master}
+
+
+
+here=`dirname \$0\`# relative
+here=`( cd \$here\  pwd )`   # absolutized and normalized
+if [ -z $here ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+flink_home=`dirname \$here\`
+
+echo flink_home=$flink_home here=$here
+cd $here
+
+if [ ! -d  _qa_workdir ] ; then
+   echo _qa_workdir doesnt exist. Creating it
+   mkdir _qa_workdir
+fi
+# attention, it overwrites
+echo _qa_workdir  .gitignore
+
+cd _qa_workdir
+
+if [ ! -d  flink ] ; then
+   echo There is no flink copy in the workdir. Cloning flink
+   git clone https://github.com/apache/flink.git flink
+   cd flink
--- End diff --

This `cd flink` is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-02-06 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/372

[FLINK-1486] add print method for prefixing a user defined string

- extend API to include a `print(String message)` method
- change `PrintingOutputformat` to include a message

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flink-1486

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #372


commit 62c7520d821f8a0718fe7fc3daca6fea09546c2b
Author: Max m...@posteo.de
Date:   2015-02-06T15:55:29Z

[FLINK-1486] add print method for prefixing a user defined string

- extend API to include a print(String message) method
- change PrintingOutputformat to include a message




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-02-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-73410628
  
Good idea. So we would print `$taskId  $outputValue` if the user did not 
supply a string and `$string:$taskId  $outputValue` otherwise. If the 
parallelization degree is 1, we would just print `$string  $outputValue` if a 
string was supplied.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1406] update Flink compatibility notice

2015-01-16 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/314

[FLINK-1406] update Flink compatibility notice



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flink_1406

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #314


commit b09ab9f644bb21737babddf387045ff953f72135
Author: Max m...@posteo.de
Date:   2015-01-16T10:02:47Z

[FLINK-1406] update Flink compatibility notice




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-01-20 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/210#issuecomment-70718732
  
I've implemented count and collect in the Scala API. There is still a 
problem with the `ListAccumulator` for non-primitive Objects (e.g. not Integer 
or Long) probably due to Object reuse. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: support for secure HDFS access using kerberos

2015-02-11 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/383

support for secure HDFS access using kerberos



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink kerberos_hdfs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/383.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #383


commit 5473f341592869e3c2659e1f77d3e65db4a755f0
Author: Max m...@posteo.de
Date:   2015-02-10T12:54:04Z

support for secure HDFS access using kerberos




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73931484
  
Why should the resulting address be different from the one Java uses? What 
does

import socket
socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET, 
socket.SOCK_DGRAM)

return in your case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73926790
  
No, same result. I tried including my hostname in /etc/hosts which got me 
past the above exception. However, now I get


02/11/2015 18:41:00 Job execution switched to status RUNNING.
02/11/2015 18:41:00 DataSource (ValueSource)(1/1) switched to 
SCHEDULED
02/11/2015 18:41:00 DataSource (ValueSource)(1/1) switched to 
DEPLOYING
02/11/2015 18:41:01 DataSource (ValueSource)(1/1) switched to 
RUNNING
02/11/2015 18:41:01 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to SCHEDULED
02/11/2015 18:41:01 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to DEPLOYING
02/11/2015 18:41:01 DataSource (ValueSource)(1/1) switched to 
FINISHED
02/11/2015 18:41:01 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to RUNNING
02/11/2015 18:41:13 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to FAILED
java.lang.Exception: The user defined 'open()' method caused an 
exception: External process for task MapPartition (PythonFlatMap - 
PythonCombine) stopped responding.
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:491)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: External process for task 
MapPartition (PythonFlatMap - PythonCombine) stopped responding.
at 
org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:72)
at 
org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:487)
... 3 more


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73726322
  
With the current version of the pull request, I get the following when 
executing

/pyflink2.sh ../resources/python/flink/example/WordCount.py

Job execution switched to status RUNNING.
DataSource (ValueSource)(1/1) switched to SCHEDULED
DataSource (ValueSource)(1/1) switched to DEPLOYING
DataSource (ValueSource)(1/1) switched to RUNNING
MapPartition (PythonFlatMap - PythonCombine)(1/1) switched to SCHEDULED
MapPartition (PythonFlatMap - PythonCombine)(1/1) switched to DEPLOYING
DataSource (ValueSource)(1/1) switched to FINISHED
MapPartition (PythonFlatMap - PythonCombine)(1/1) switched to RUNNING

This hangs forever. If I abort, no Exception is thrown. The job manager log 
states:

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$main$1$$anon$1  - 
Received job 920dca962d0ba21e3be8d3997c0940f1 (Flink Java Job at Tue Feb 10 
16:54:56 CET 2015).
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$main$1$$anon$1  - 
Scheduling job Flink Java Job at Tue Feb 10 16:54:56 CET 2015.
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying 
DataSource (ValueSource) (1/1) (attempt #0) to localhost
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$main$1$$anon$1  - 
Status of job 920dca962d0ba21e3be8d3997c0940f1 (Flink Java Job at Tue Feb 10 
16:54:56 CET 2015) changed to RUNNING.
org.apache.flink.runtime.taskmanager.TaskManager  - There is no 
profiling enabled for the task manager.
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying 
MapPartition (PythonFlatMap - PythonCombine) (1/1) (attempt #0) to localhost
org.apache.flink.runtime.taskmanager.Task - DataSource 
(ValueSource) (1/1) switched to FINISHED


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73736669
  
Now I'm getting:

02/10/2015 17:47:06 Job execution switched to status RUNNING.
02/10/2015 17:47:06 DataSource (ValueSource)(1/1) switched to 
SCHEDULED
02/10/2015 17:47:06 DataSource (ValueSource)(1/1) switched to 
DEPLOYING
02/10/2015 17:47:06 DataSource (ValueSource)(1/1) switched to 
RUNNING
02/10/2015 17:47:06 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to SCHEDULED
02/10/2015 17:47:06 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to DEPLOYING
02/10/2015 17:47:06 DataSource (ValueSource)(1/1) switched to 
FINISHED
02/10/2015 17:47:06 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to RUNNING
02/10/2015 17:47:16 MapPartition (PythonFlatMap - 
PythonCombine)(1/1) switched to FAILED
java.lang.Exception: The user defined 'open()' method caused an 
exception: External process for task MapPartition (PythonFlatMap - 
PythonCombine) stopped responding.
Traceback (most recent call last):
  File 
/var/folders/p0/153pkhvn2vn79b2yszvb64thgn/T/tmp_d688cd6ddf2347ab1d713a847b73d234/flink/executor.py,
 line 55, in module
s.bind((socket.gethostname(), 0))
  File 
/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py,
 line 224, in meth
return getattr(self._sock,name)(*args)
socket.gaierror: [Errno 8] nodename nor servname provided, or not known
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:491)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: External process for task 
MapPartition (PythonFlatMap - PythonCombine) stopped responding.
Traceback (most recent call last):
  File 
/var/folders/p0/153pkhvn2vn79b2yszvb64thgn/T/tmp_d688cd6ddf2347ab1d713a847b73d234/flink/executor.py,
 line 55, in module
s.bind((socket.gethostname(), 0))
  File 
/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py,
 line 224, in meth
return getattr(self._sock,name)(*args)
socket.gaierror: [Errno 8] nodename nor servname provided, or not known
at 
org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:72)
at 
org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:487)
... 3 more

02/10/2015 17:47:16 Job execution switched to status FAILING.
02/10/2015 17:47:16 GroupReduce (PythonGroupReducePreStep)(1/1) 
switched to CANCELED
02/10/2015 17:47:16 MapPartition (PythonGroupReduce)(1/1) switched 
to CANCELED
02/10/2015 17:47:16 DataSink(PrintSink)(1/1) switched to CANCELED
02/10/2015 17:47:16 Job execution switched to status FAILED.
Error: The program execution failed: java.lang.Exception: The user 
defined 'open()' method caused an exception: External process for task 
MapPartition (PythonFlatMap - PythonCombine) stopped responding.
Traceback (most recent call last):
  File 
/var/folders/p0/153pkhvn2vn79b2yszvb64thgn/T/tmp_d688cd6ddf2347ab1d713a847b73d234/flink/executor.py,
 line 55, in module
s.bind((socket.gethostname(), 0))
  File 
/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py,
 line 224, in meth
return getattr(self._sock,name)(*args)
socket.gaierror: [Errno 8] nodename nor servname provided, or not known
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:491)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: External process for task 
MapPartition (PythonFlatMap - PythonCombine) stopped responding.
Traceback (most recent call last):
  File 
/var/folders/p0/153pkhvn2vn79b2yszvb64thgn/T/tmp_d688cd6ddf2347ab1d713a847b73d234/flink/executor.py,
 line 55, in module
s.bind((socket.gethostname(), 0

[GitHub] flink pull request: [FLINK-947] Add a declarative expression API

2015-02-18 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/405#issuecomment-74845397
  
To me, `flink-expressions` sounds much better than `linq` and it mitigates 
the risk of law suites :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Remove extra space after open parenthesis in I...

2015-02-19 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/416#issuecomment-75019775
  
:+1: Good to merge :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add auto-parallelism to Jobs (0.8 branch)

2015-02-17 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/410#discussion_r24804242
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java 
---
@@ -374,6 +375,8 @@ public JobSubmissionResult submitJob(JobGraph job) 
throws IOException {
LOG.debug(String.format(Running master 
initialization of job %s (%s), job.getJobID(), job.getName()));
}
 
+   final int numSlots = scheduler.getTotalNumberOfSlots();
--- End diff --

Shouldn't this be set to `getNumberOfAvailableSlots()` for the 
PARALLELISM_AUTO_MAX case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add auto-parallelism to Jobs (0.8 branch)

2015-02-17 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/410#issuecomment-74642763
  
Right now, the user has to set the parallelism to 
`ExecutionConfig.PARALLELISM_AUTO_MAX`. Why not use all available task slots by 
default? I understand, that we shouldn't simply grab all resources but the auto 
parallelism will only grab the resources which were already granted to Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-1436] refactor CLiFrontend to provide m...

2015-01-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/331#issuecomment-71649409
  
Thanks for your feedback, @rmetzger . The error message is at the bottom 
because that way it is most easily identifiable by the user (no scrolling 
necessary). Before, we printed the error and then the help which let the help 
shadow the error message.

I changed the error reporting in case the user didn't specify an action.

Concerning the printing of the help message, you're probably right. Let's 
just print the help if the user asks for it. Now it prints:
 ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar is not a 
valid action.
 Valid actions are run, list, info, or cancel.

Additionally, let's 
* change `info` to print the plan by default
* change `cancel` to accept the job id as a parameter instead of an option
* change `list` to print scheduled and running jobs by default


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...

2015-01-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/344#issuecomment-71616952
  
@StephanEwen Yes, that was on purpose. The previous two data structures 
(`HashMap` and `Queue`) are now replaced by the `LinkedHashMap` which serves 
the same functionality. It might not be obvious but the `LinkedHashMap` 
preserves the order of the inserted items. From 
`scala.collection.mutable.LinkedHashMap`:

 This class implements mutable maps using a hashtable.
 The iterator and all traversal methods of this class visit elements in 
the order they were inserted.

That's why `graphs.iterator.next()` always returns the least recently 
inserted item.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1330] [build] Build creates a link in t...

2015-01-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/333#issuecomment-71633168
  
So either we can somehow change the execution order of the clean goal or we 
fix this in the junction plugin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1330] [build] Build creates a link in t...

2015-01-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/333#issuecomment-71366786
  
This looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-1436] refactor CLiFrontend to provide m...

2015-01-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/331#issuecomment-71446044
  
@rmetzger @StephanEwen I squashed the commits, merged the recent changes in 
the CliFrontend on the master, and addressed 
[FLINK-1424](https://issues.apache.org/jira/browse/FLINK-1424).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1320] Add an off-heap variant of the ma...

2015-01-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/290#issuecomment-69361811
  
@rmetzger I added some documentation for the config parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-02-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-73504724
  
To make it a bit more explicit what is sink identifier and what is the task 
identifier (especially when just one of the two are printed), I prefixed the 
sink identifier with sinkId and the task identifier with taskId.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1436] refactor CLiFrontend to provide m...

2015-02-12 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/331#issuecomment-74088065
  
@rmetzger Thanks but I think it should be This closes #331 :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...

2015-03-18 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-82879313
  
Any further comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-18 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/488#issuecomment-83095571
  
Any objections against merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1688] Socket client sink added

2015-03-16 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/484#discussion_r26483463
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.socket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientSinkIN extends RichSinkFunctionIN {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SocketClientSink.class);
+
+   private final String hostName;
+   private final int port;
+   private final SerializationSchemaIN, byte[] scheme;
+   private transient Socket client;
+   private transient DataOutputStream dataOutputStream;
+
+   public SocketClientSink(String hostName, int port, 
SerializationSchemaIN, byte[] schema) {
+   this.hostName = hostName;
+   this.port = port;
+   this.scheme = schema;
+   }
+
+   /**
+* Initializes the connection to Socket.
+*/
+   public void initialize() {
+   OutputStream outputStream;
+   try {
+   client = new Socket(hostName, port);
+   outputStream = client.getOutputStream();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   dataOutputStream = new DataOutputStream(outputStream);
+   }
+
+   /**
+* Called when new data arrives to the sink, and forwards it to Socket.
+*
+* @param value
+*  The incoming data
+*/
+   @Override
+   public void invoke(IN value) {
+   byte[] msg = scheme.serialize(value);
+   try {
+   dataOutputStream.write(msg);
+   } catch (IOException e) {
+   if(LOG.isErrorEnabled()){
+   LOG.error(Cannot send message to socket server 
at {}:{}, hostName, port);
+   }
+   }
+   }
+
+   /**
+* Closes the connection.
+*/
+   private void closeConnection(){
--- End diff --

really? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1688] Socket client sink added

2015-03-16 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/484#issuecomment-81665515
  
Thanks for the pull request. Could you add some javadoc to the class and 
add an entry in the Flink website docs [1]?

[1] 
http://ci.apache.org/projects/flink/flink-docs-master/programming_guide.html#data-sinks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1688] Socket client sink added

2015-03-16 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/484#discussion_r26483484
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.socket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientSinkIN extends RichSinkFunctionIN {
--- End diff --

Can you add a docstring for the class to explain its purpose?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1688] Socket client sink added

2015-03-16 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/484#discussion_r26483479
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.socket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientSinkIN extends RichSinkFunctionIN {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SocketClientSink.class);
+
+   private final String hostName;
+   private final int port;
+   private final SerializationSchemaIN, byte[] scheme;
+   private transient Socket client;
+   private transient DataOutputStream dataOutputStream;
+
+   public SocketClientSink(String hostName, int port, 
SerializationSchemaIN, byte[] schema) {
+   this.hostName = hostName;
+   this.port = port;
+   this.scheme = schema;
+   }
+
+   /**
+* Initializes the connection to Socket.
+*/
+   public void initialize() {
--- End diff --

really? :) You could call this method `intializeConnection` like its 
counterpart `closeConnection`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...

2015-03-16 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26471022
  
--- Diff: 
flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class GroupCombineProperties extends OperatorDescriptorSingle 
{
--- End diff --

Totally agree with you. We should not accept undocumented classes. I added 
some doc, also for the base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-16 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/488

[FLINK-1679] rename degree of parallelism to parallelism  extend 
documentation about parallelism



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink parallelism

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #488


commit 9acf5057820ef2e9feece3da216cbd40dcb4cfd9
Author: Maximilian Michels m...@apache.org
Date:   2015-03-16T15:52:21Z

[FLINK-1679] rename degree of parallelism to parallelism

* [Dd]egree[ -]of[ -]parallelism - [pP]arallelism
* DOP - [pP]arallelism
* paraDegree - parallelism
* DEGREE_OF_PARALLELISM - PARALLELISM
* parallelization.degree.default - parallelism.default

commit b136659524ddcac9bf47b229b8bf0fff29bd171c
Author: Maximilian Michels m...@apache.org
Date:   2015-03-16T17:01:33Z

[FLINK-1679] extend faq and programming guide to clarify parallelism




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-17 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/488#issuecomment-82251340
  
@uce @StephanEwen I agree with you two that it's better to keep the old API 
methods and mark them as deprecated while introducing the new ones. When the 
API has been stabilized and 1.0 comes up, we can remove the old methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-20 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/488#issuecomment-84058237
  
Any other opinions on the API breaking nature of these changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-09 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/466

[FLINK-1622][java-api][scala-api] add a partial GroupReduce operator

The partial GroupReduce operator acts like a regular GroupReduce
operator but does not perform a full group reduce. Instead, it performs
the GroupReduce only on the individual partitions. This may lead to a
partial GroupReduce result.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink reducePartialOperator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/466.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #466






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26120661
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 ---
@@ -156,6 +156,23 @@ public SortedGrouping(DataSetT set, KeysT keys, 
String field, Order order) {
return new GroupReduceOperatorT, R(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName() );
}
 
+   /**
+* Applies a partial GroupReduce transformation on a grouped and sorted 
{@link DataSet}.
+*
+* In contrast to the reduceGroup transformation, the GroupReduce 
function is only called on each partition. Thus,
+* partial solutions are likely to occur.
+* @param reducer The ReduceFunction that is applied on the DataSet.
+* @return A GroupReducePartial operator which represents the partially 
reduced DataSet.
+*/
+   public R GroupReducePartialOperatorT, R 
reduceGroupPartially(GroupReduceFunctionT, R reducer) {
+   if (reducer == null) {
+   throw new NullPointerException(GroupReduce function 
must not be null.);
+   }
+   TypeInformationR resultType = 
TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
+
+   return new GroupReducePartialOperatorT, R(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
+   }
+
--- End diff --

We need a grouping to combine the correct values, don't we? Otherwise, we 
would perform an AllReduce.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26215888
  
--- Diff: docs/_includes/navbar.html ---
@@ -24,15 +24,15 @@
 We might be on an externally hosted documentation site.
 Please keep the site.FLINK_WEBSITE_URL below to ensure a link back 
to the Flink website.
 {% endcomment %}
-   a href={{ site.FLINK_WEBSITE_URL }}index.html title=Home
+   a href={{ site.FLINK_WEBSITE_URL }}/index.html title=Home
  img class=hidden-xs hidden-sm img-responsive
-  src={{ site.baseurl }}img/logo.png alt=Apache Flink Logo
+  src={{ site.baseurl }}/img/logo.png alt=Apache Flink Logo
/a
div class=row visible-xs
  div class=col-xs-3
-   a href={{ site.baseurl }}index.html title=Home  
+   a href={{ site.baseurl }}/index.html title=Home
--- End diff --

Yes, we can set in it the _config.yml. I would not change it now because it 
has some other implications. For example, local testing of the website will be 
more complicated because all links will point to the online version. We can 
open a separate issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-12 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/479#issuecomment-78300593
  
Nice solution. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26219828
  
--- Diff: docs/_layouts/default.html ---
@@ -23,16 +23,25 @@
 meta http-equiv=X-UA-Compatible content=IE=edge
 meta name=viewport content=width=device-width, initial-scale=1
 titleApache Flink: {{ page.title }}/title
-link rel=shortcut icon href={{ site.baseurl }}favicon.ico 
type=image/x-icon
-link rel=icon href={{ site.baseurl }}favicon.ico 
type=image/x-icon
-link rel=stylesheet href={{ site.baseurl }}css/bootstrap.css
-link rel=stylesheet href={{ site.baseurl 
}}css/bootstrap-lumen-custom.css
-link rel=stylesheet href={{ site.baseurl }}css/syntax.css
-link rel=stylesheet href={{ site.baseurl }}css/custom.css
-link href={{ site.baseurl }}css/main/main.css rel=stylesheet
+link rel=shortcut icon href={{ site.baseurl }}/favicon.ico 
type=image/x-icon
+link rel=icon href={{ site.baseurl }}/favicon.ico 
type=image/x-icon
+link rel=stylesheet href={{ site.baseurl }}/css/bootstrap.css
+link rel=stylesheet href={{ site.baseurl 
}}/css/bootstrap-lumen-custom.css
+link rel=stylesheet href={{ site.baseurl }}/css/syntax.css
+link rel=stylesheet href={{ site.baseurl }}/css/custom.css
+link href={{ site.baseurl }}/css/main/main.css rel=stylesheet
--- End diff --

Local testing should work out of the box. We could set `{{site.baseurl}}` 
to http://ci.apache.org/projects/flink/flink-docs-master/ and then change our 
build_docs script to serve the website for local testing with the `--baseurl 
http://127.0.01/` parameter. If you want, you can fix this here or we open a 
JIRA for that.

http://jekyllrb.com/docs/configuration/#serve-command-options



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-78093403
  
@aljoscha @fhueske For a general combine, the operator can be used without 
grouping. When we want to combine elements before performing a proper 
groupReduce with a groupBy, we also need to use groupBy for the combine. 
Otherwise, we wouldn't know in the combiner, which keys belong together. 
However, there are cases where a combiner without a groupBy would be 
appriopriate. That's why the new operator is exposed in `DataSet`, 
`GroupedDataSet`, ad `SortedDataSet`. We currently have the same behavior for a 
normal `GroupReduce` despite the fact that the combine of the GroupReduce 
cannot change the input type.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26139203
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReducePartialOperator.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import 
org.apache.flink.api.common.operators.base.GroupReducePartialOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupPartialOperator;
+import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupPartialOperator;
+import 
org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This operator behaves like the GroupReduceOperator with Combine but 
only runs the Combine part which reduces all data
+ * locally in their partitions. The combine part can return an arbitrary 
data type. This is useful to pre-combine values 
+ * into an intermediate representation before applying a proper reduce 
operation.
+ *
--- End diff --

That is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-78076027
  
@aljoscha Thanks for the comments. I agree, the tests are a bit shady 
because they test the operator by first performing a partial, then a full 
reduce. Using a custom partitioner would make more sense.

@hsaputra Yes, we absolutely have to add docs apart from the Java/Scala 
docs.

What do you think about the name? @fhueske suggested to expose the operator 
as `combine` because it is essentially a user-accessible combiner..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26130234
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReducePartialOperatorBase.java
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ *
--- End diff --

Thanks for spotting the missing doc in this class. I'll add some.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26130403
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -355,6 +355,63 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+   * Partial variant of the reduceGroup transformation which operates only 
on the individual
+   * partitions. This may lead to partially reduced results.
+   * Creates a new [[DataSet]] by passing for each group (elements with 
the same key) the list
+   * of elements to the group reduce function. The function must output 
one element. The
+   * concatenation of those will form the resulting [[DataSet]].
+   */
+  def reduceGroupPartially[R: TypeInformation: ClassTag](
+ fun: (Iterator[T]) = R): 
DataSet[R] = {
+Validate.notNull(fun, Group reduce function must not be null.)
+val reducer = new GroupReduceFunction[T, R] {
+  val cleanFun = set.clean(fun)
+  def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+out.collect(cleanFun(in.iterator().asScala))
+  }
+}
+wrap(
+  new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
+implicitly[TypeInformation[R]], reducer, getCallLocationName()))
+  }
+
+  /**
+   * Partial variant of the reduceGroup transformation which operates only 
on the individual
+   * partitions. This may lead to partially reduced results.
+   * Creates a new [[DataSet]] by passing for each group (elements with 
the same key) the list
+   * of elements to the group reduce function. The function can output 
zero or more elements using
+   * the [[Collector]]. The concatenation of the emitted values will form 
the resulting [[DataSet]].
+   */
+  def reduceGroupPartially[R: TypeInformation: ClassTag](
+  fun: (Iterator[T], Collector[R]) 
= Unit): DataSet[R] = {
+Validate.notNull(fun, Group reduce function must not be null.)
+val reducer = new GroupReduceFunction[T, R] {
+  val cleanFun = set.clean(fun)
+  def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+cleanFun(in.iterator().asScala, out)
+  }
+}
+wrap(
+  new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
+implicitly[TypeInformation[R]], reducer, getCallLocationName()))
+  }
+
+  /**
+   * Partial variant of the reduceGroup transformation which operates only 
on the individual
+   * partitions. This may lead to partially reduced results.
+   * Creates a new [[DataSet]] by passing for each group (elements with 
the same key) the list
+   * of elements to the [[GroupReduceFunction]]. The function can output 
zero or more elements. The
+   * concatenation of the emitted values will form the resulting 
[[DataSet]].
+   */
+  def reduceGroupPartially[R: TypeInformation: ClassTag](
+  reducer: GroupReduceFunction[T, R]): DataSet[R] = {
+Validate.notNull(reducer, GroupReduce function must not be null.)
+wrap(
+  new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
+implicitly[TypeInformation[R]], reducer, getCallLocationName()))
+  }
+
+  /**
--- End diff --

See my answer above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1629][FLINK-1630][FLINK-1547] Rework Fl...

2015-03-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/468#issuecomment-78247421
  
Nice work. Looks good to me apart from the missing documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/479#issuecomment-78257904
  
Really nice work, Till! Very nicely documented. Just made a few comments 
concerning the paths for the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26205970
  
--- Diff: docs/_includes/navbar.html ---
@@ -24,15 +24,15 @@
 We might be on an externally hosted documentation site.
 Please keep the site.FLINK_WEBSITE_URL below to ensure a link back 
to the Flink website.
 {% endcomment %}
-   a href={{ site.FLINK_WEBSITE_URL }}index.html title=Home
+   a href={{ site.FLINK_WEBSITE_URL }}/index.html title=Home
  img class=hidden-xs hidden-sm img-responsive
-  src={{ site.baseurl }}img/logo.png alt=Apache Flink Logo
+  src={{ site.baseurl }}/img/logo.png alt=Apache Flink Logo
/a
div class=row visible-xs
  div class=col-xs-3
-   a href={{ site.baseurl }}index.html title=Home  
+   a href={{ site.baseurl }}/index.html title=Home
--- End diff --

This will point the Home button to http://ci.apache.org instead of the 
documentation. {{ site.baseurl }} is actually not set in the config. We could 
set it to the current docs or just keep the relative links which worked fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26205980
  
--- Diff: docs/_includes/sidenav.html ---
@@ -17,51 +17,51 @@
 under the License.
 --
 ul id=flink-doc-sidenav
-  lidiv class=sidenav-categorya href=faq.htmlFAQ/a/div/li
+  lidiv class=sidenav-categorya href=/faq.htmlFAQ/a/div/li
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26205978
  
--- Diff: docs/_includes/navbar.html ---
@@ -49,15 +49,15 @@
  ul class=nav navbar-nav
 
li
- a href=index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
+ a href=/index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
/li
 
li
- a href=api/java/index.htmlJavadoc/a
+ a href=/api/java/index.htmlJavadoc/a
/li
 
li
- a 
href=api/scala/index.html#org.apache.flink.api.scala.packageScaladoc/a
+ a 
href=/api/scala/index.html#org.apache.flink.api.scala.packageScaladoc/a
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26205983
  
--- Diff: docs/_layouts/default.html ---
@@ -23,16 +23,25 @@
 meta http-equiv=X-UA-Compatible content=IE=edge
 meta name=viewport content=width=device-width, initial-scale=1
 titleApache Flink: {{ page.title }}/title
-link rel=shortcut icon href={{ site.baseurl }}favicon.ico 
type=image/x-icon
-link rel=icon href={{ site.baseurl }}favicon.ico 
type=image/x-icon
-link rel=stylesheet href={{ site.baseurl }}css/bootstrap.css
-link rel=stylesheet href={{ site.baseurl 
}}css/bootstrap-lumen-custom.css
-link rel=stylesheet href={{ site.baseurl }}css/syntax.css
-link rel=stylesheet href={{ site.baseurl }}css/custom.css
-link href={{ site.baseurl }}css/main/main.css rel=stylesheet
+link rel=shortcut icon href={{ site.baseurl }}/favicon.ico 
type=image/x-icon
+link rel=icon href={{ site.baseurl }}/favicon.ico 
type=image/x-icon
+link rel=stylesheet href={{ site.baseurl }}/css/bootstrap.css
+link rel=stylesheet href={{ site.baseurl 
}}/css/bootstrap-lumen-custom.css
+link rel=stylesheet href={{ site.baseurl }}/css/syntax.css
+link rel=stylesheet href={{ site.baseurl }}/css/custom.css
+link href={{ site.baseurl }}/css/main/main.css rel=stylesheet
--- End diff --

This will break the loading of stylesheets on our website. {{ site.baseurl 
}} is actually not set in the config. Thus the path /css will load the css 
path of http://ci.apache.org


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26205976
  
--- Diff: docs/_includes/navbar.html ---
@@ -49,15 +49,15 @@
  ul class=nav navbar-nav
 
li
- a href=index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
+ a href=/index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
/li
 
li
- a href=api/java/index.htmlJavadoc/a
+ a href=/api/java/index.htmlJavadoc/a
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26205975
  
--- Diff: docs/_includes/navbar.html ---
@@ -49,15 +49,15 @@
  ul class=nav navbar-nav
 
li
- a href=index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
+ a href=/index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
--- End diff --

Please use relative paths. This will fail with our current docs setup 
because the base url is http://ci.apache.org/projects/flink/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-13 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-79096322
  
@aljoscha @fhueske @hsaputra Thanks for the feedback. Some people suggested 
that the name is confusing and that my pull request involved too much code 
duplication. I propose to call the new operator `combineGroup` because it is a 
combiner that works on Groups, just like the existing combiners in 
`GroupReduceCombineDriver` and `AllGroupReduceDriver`. I refactored the code to 
reuse Flink's existing logic for combiners. 

@hsaputra I added documentation for the operator in the code and the 
official documentation. Some Scala tests for the API were added as well.

When merging this pull request, I would squash the two commits and keep the 
commit message of the latest one. I simply wanted to keep the first one to show 
the development process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/202#discussion_r26132770
  
--- Diff: 
flink-addons/flink-language-binding/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
 ---
@@ -0,0 +1,247 @@

+
--- End diff --

The main problem here is that you prepended the BSD-licensed Dill code with 
the Apache license. Just remove the Apache license header and clearly separate 
your code from the Dill library. Then it should be no problem having the Apache 
and BSD license side by side. After all, they are very similar, except for 
additional patent grants that the Apache license provides.

Alternatively, I'd suggest to use some sort of package management (e.g. 
`pip`) to install the library. Then we don't have to package it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kick off of Flink's machine learning library

2015-03-11 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/479#discussion_r26215901
  
--- Diff: docs/_includes/navbar.html ---
@@ -49,15 +49,15 @@
  ul class=nav navbar-nav
 
li
- a href=index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
+ a href=/index.html class={% if page.url contains 'index.html' 
%}active{% endif %}Documentation/a
--- End diff --

We could set the `base href=http://ci.apache.org/projects/flink/.../;` 
in the header. That way, we would be able to use relative links. However, 
hard-coding the link to the docs in the template variables, is also not a very 
nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink-1780: Rename FlatCombineFunction to Grou...

2015-03-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/530#discussion_r27104700
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1425,11 +1425,9 @@ public static boolean isClassType(Type t) {
}
 
private static boolean sameTypeVars(Type t1, Type t2) {
-   if (!(t1 instanceof TypeVariable) || !(t2 instanceof 
TypeVariable)) {
-   return false;
-   }
-   return ((TypeVariable?) 
t1).getName().equals(((TypeVariable?)t2).getName())
-((TypeVariable?) 
t1).getGenericDeclaration().equals(((TypeVariable?)t2).getGenericDeclaration());
+   return !(!(t1 instanceof TypeVariable) || !(t2 instanceof 
TypeVariable)) 
+   ((TypeVariable?) 
t1).getName().equals(((TypeVariable?) t2).getName()) 
+   ((TypeVariable?) 
t1).getGenericDeclaration().equals(((TypeVariable?) 
t2).getGenericDeclaration());
--- End diff --

Not sure if this change improves the readability of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink-1780: Rename FlatCombineFunction to Grou...

2015-03-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/530#discussion_r27104945
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 ---
@@ -1465,7 +1467,9 @@ public static void 
cancelChainedTasks(ListChainedDriver?, ? tasks) {
for (int i = 0; i  tasks.size(); i++) {
try {
tasks.get(i).cancelTask();
-   } catch (Throwable t) {}
+   } catch (Throwable t) {
+   // do nothing
+   }
}
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink-1780: Rename FlatCombineFunction to Grou...

2015-03-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/530#discussion_r27104851
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 ---
@@ -525,7 +525,9 @@ protected void run() throws Exception {
try {
FunctionUtils.closeFunction(this.stub);
}
-   catch (Throwable t) {}
+   catch (Throwable t) {
+   // do nothing
+   }
}
--- End diff --

A comment could be useful here to explain why we do a catch-all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink-1780: Rename FlatCombineFunction to Grou...

2015-03-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/530#discussion_r27104741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
 ---
@@ -35,20 +35,20 @@
 * Like @org.apache.flink.runtime.operators.GroupCombineDriver but without 
grouping and sorting. May emit partially
 * reduced results.
 *
-* @see org.apache.flink.api.common.functions.FlatCombineFunction
+* @see GroupCombineFunction
--- End diff --

Path to class missing here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink-1780: Rename FlatCombineFunction to Grou...

2015-03-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/530#issuecomment-85954079
  
+1 This rename makes a lot of sense. Looks good to me apart from the minor 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink-1780: Rename FlatCombineFunction to Grou...

2015-03-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/530#issuecomment-86090016
  
The documentation also needs to be adapted to use the new interface name.

@smarthi If you don't mind I will merge your commit with my remarks 
addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1769] Fix deploy bug caused by ScalaDoc...

2015-03-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/535#issuecomment-86449530
  
+1 for merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1589] Add option to pass configuration ...

2015-04-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/427#discussion_r27564491
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
@@ -103,6 +110,9 @@ public void start() throws Exception {

// create the embedded runtime
Configuration configuration = 
getConfiguration(this);
+   if(this.configuration != null) {
+   
configuration.addAll(this.configuration);
+   }
--- End diff --

Wouldn't it be better to move the null check to the constructor and create 
the default `Configuration` there via a call to `getConfiguration`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1589] Add option to pass configuration ...

2015-04-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/427#discussion_r27564493
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
@@ -70,6 +72,11 @@ public LocalExecutor() {
}
}
 
+   public LocalExecutor(Configuration conf) {
+   super();
+   this.configuration = conf;
+   }
--- End diff --

Should `super()` not be actually `this()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1589] Add option to pass configuration ...

2015-04-01 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/427#discussion_r27569381
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
@@ -1058,6 +1059,20 @@ public static LocalEnvironment 
createLocalEnvironment(int parallelism) {
lee.setParallelism(parallelism);
return lee;
}
+
+   /**
+* Creates a {@link LocalEnvironment}. The local execution environment 
will run the program in a
+* multi-threaded fashion in the same JVM as the environment was 
created in. It will use the
+* degree of parallelism specified in the parameter.
--- End diff --

+1 please change it to parallelism :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1790]Remove the redundant import code

2015-03-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/538#issuecomment-86930773
  
:+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add auto-parallelism to Jobs (0.8 branch)

2015-03-03 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/410#issuecomment-76978687
  
@rmetzger I don't see a reason why this should not go to master as well. 
After all, it's optional and quite useful if you want to run a job on the full 
cluster with as many available slots as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1624] fix failing build due to maven gi...

2015-03-02 Thread mxm
Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/445


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-02-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/210#issuecomment-76385275
  
I moved the `writeInto(ByteBuf buf)` method to the subclasses of 
`AbstractID` used by `NettyMessage`. This is the only context where the method 
is being used and the subclasses already have another `fromByteBuf(ByteBuf 
buf)` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...

2015-02-23 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/421#issuecomment-75566621
  
@fhueske The user could do so by selecting Show all task managers and 
then identify the struggling task manager. For large cluster setups, it makes 
sense to sample just from a few task managers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...

2015-02-23 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/421#issuecomment-75554187
  
Looks really nice and informative :+1: 

Some suggestions:

- It would be great if one could specify the number of task managers to 
see. If the number of task managers shown is smaller than the total number of 
task managers, there should be a shuffle button to show a random selection of 
task managers.

- Add some information about the different memory statistics. The labels 
might not be intuitive for the average user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...

2015-02-23 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/421#issuecomment-75600898
  
I didn't fully understand that you wanted to have one chart containing all 
task managers' load. That's a good thing. If it is only one chart, the overhead 
to update it should not be as high as creating a chart for every task manager 
(like it is now).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-02-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/210#discussion_r25426681
  
--- Diff: flink-core/pom.xml ---
@@ -58,6 +58,12 @@ under the License.
version0.5.1/version
/dependency
 
+   dependency
--- End diff --

Simply because AbstractID depends on `io.netty.buffer.ByteBuf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-02-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/210#issuecomment-76151146
  
I squashed the commits and rebased to the current master. Any objections 
against merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-1436] refactor CLiFrontend to provide m...

2015-01-23 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/331

[Flink-1436]  refactor CLiFrontend to provide more identifiable and 
meaningful error messages



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flink-1436

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/331.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #331


commit 1bb326c347ce26b0458a4ed437e418a227820ed4
Author: Max m...@posteo.de
Date:   2015-01-23T13:05:31Z

[FLINK-1436] more meaningful error messages

commit 5daf545384b1a2dfbed5273c691804ed45e211c1
Author: Max m...@posteo.de
Date:   2015-01-23T13:05:55Z

[FLINK-1436] remove verbose flag and default to verbose output

commit 4cc1f691df57a0f411419283522952ba13a4d713
Author: Max m...@posteo.de
Date:   2015-01-23T14:31:07Z

[FLINK-1436] rework error message logic using checked exceptions

in particular, error messages are printed after usage information which
provides information about the error immediately (no more scrolling)

commit ab0bfa0fbd854facfbee6f74a601be975ea268b3
Author: Max m...@posteo.de
Date:   2015-01-23T14:39:40Z

[FLINK-1436] adapt tests for the new error handling of CliFrontend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-23 Thread mxm
Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/488


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-23 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/488#issuecomment-84875935
  
Merged in master with 126f9f799071688fe80955a7e7cfa991f53c95af


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1679] rename degree of parallelism to p...

2015-03-23 Thread mxm
GitHub user mxm reopened a pull request:

https://github.com/apache/flink/pull/488

[FLINK-1679] rename degree of parallelism to parallelism  extend 
documentation about parallelism

https://issues.apache.org/jira/browse/FLINK-1679

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink parallelism

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #488


commit 17097fdf51f41445bad6da2186868185a6bf947b
Author: Maximilian Michels m...@apache.org
Date:   2015-03-18T09:44:42Z

[FLINK-1679] deprecate API methods to set the parallelism

commit f6ba8c07cc9a153b1ac1e213f9749155c42ae3c3
Author: Maximilian Michels m...@apache.org
Date:   2015-03-18T09:44:43Z

[FLINK-1679] use a consistent name for parallelism

* rename occurrences of degree of parallelism to parallelism

* [Dd]egree[ -]of[ -]parallelism - [pP]arallelism
* (DOP|dop) - [pP]arallelism
* paraDegree - parallelism
* degree-of-parallelism - parallelism
* DEGREE_OF_PARALLELISM - PARALLELISM

commit 658bb1166aa907677e06cf011e5a0fdaf58ab15f
Author: Maximilian Michels m...@apache.org
Date:   2015-03-18T09:44:44Z

[FLINK-1679] deprecate old parallelism config entry

old config parameter can still be used

OLD
parallelization.degree.default

NEW
parallelism.default

commit 412ac54df0fde12666afbc1414df5fd919ba1607
Author: Maximilian Michels m...@apache.org
Date:   2015-03-18T09:44:45Z

[FLINK-1679] extend faq and programming guide to clarify parallelism




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1768]Fix the bug of BlobServerConnectio...

2015-03-23 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/514#issuecomment-84933938
  
Thanks for spotting this. We will merge your change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stream graph + internal refactor

2015-04-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/594#issuecomment-92780564
  
This is a huge PR but from what I've seen you made very sensible changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scheduling] implement backtracking with inter...

2015-04-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/595#discussion_r28319765
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -339,6 +339,12 @@ extends Actor with ActorLogMessages with ActorLogging {
   }
 }
 
+  /**
+   * Always deny locking of ResultPartitions.
+   */
+  case LockResultPartition(partitionID: IntermediateResultPartitionID) 
=
--- End diff --

The indent here is to lock any available ResulPartition for an 
IntermediateResultPartitionID, regardless of the ExecutionAttemptID.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1887]Fix the message in runtime excepti...

2015-04-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/599#issuecomment-92775321
  
Good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scheduling] implement backtracking with inter...

2015-04-13 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/595

[scheduling] implement backtracking with intermediate result checking

- backtracks from the sinks of an ExecutionGraph
- checks the availability of IntermediatePartitionResults
- marks ExecutionVertex to be scheduled

This first version of backtracking does not support resume/recovery from
intermediate results yet. It lays the foundation for integrating the
remaining changes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink backtracking-scheduling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/595.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #595


commit 3ac0b9e2bc724b91a0be7c6a9c97c937f3d9aa23
Author: Maximilian Michels m...@apache.org
Date:   2015-04-13T17:32:40Z

[scheduling] implement backtracking with intermediate result checking

- backtracks from the sinks of an ExecutionGraph
- checks the availability of IntermediatePartitionResults
- marks ExecutionVertex to be scheduled

This first version of backtracking does not support resume/recovery from
intermediate results yet. It lays the foundation for integrating the
remaining changes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1908] JobManager startup delay isn't co...

2015-04-20 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/609#issuecomment-94401080
  
Thanks for the pull request. Seems to work fine. I was wondering, shouldn't 
the task managers repeatably try to build up a connection to the job manager? 
For me, that seems to be a nicer way to solve this problem. That way, the 
startup script doesn't need to be aware of the job manager's rpc port.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-04-21 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-94703910
  
I've added documentation for the new print method. Will merge later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-21 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94798580
  
Congraluations! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-04-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-95880258
  
Yes, it should be simple for the user. It makes sense to have one print 
method which just prints the output on the client. In addition, we could have 
another *advanced* print method which prints a prefix and optionally the task 
id.

- `print()`
- `print(String prefix, boolean includeParallelID)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-04-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-95855310
  
Just saying that a prefix helps to identify output, even if everything is 
printed on the client. Additionally, including the task id in the output can be 
useful for debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-938] Auomatically configure the jobmana...

2015-04-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/248#issuecomment-95990793
  
This will work for most use cases where the JobManager is started within 
the same cluster as the TaskManagers. It will certainly fail if the JobManager 
resides in a different network environment where the hostname cannot be 
resolved by the TaskManagers.

I think it's ok to merge if we print a big warning when the job manager 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-04-23 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-95488183
  
Do we want to break backwards compatibility or include a new method for 
printing on the client? After all, printing on the workers is a useful tool to 
debug the dataflow of a program.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1486] add print method for prefixing a ...

2015-04-20 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/372#issuecomment-94502825
  
I've updated the pull request. I decided to implement the concise method:
```
sinkId:taskId output  - sink id provided, parallelism  1
sinkId output - sink id provided, parallelism == 1
taskId output - no sink id provided, parallelism  1
output - no sink id provided, parallelism == 1
```
If no objections, I will merge this tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-04-29 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/638#issuecomment-97353102
  
Wow! Great to see that we get rid of the only python-side dependency. It 
was a bit unclear under which terms we could ship the library anyways. Have you 
done any measurements how this effects the performance? IMO the performance 
impact should be near zero, perhaps even faster now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1843] remove SoftReferences on archived...

2015-04-29 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/639

[FLINK-1843] remove SoftReferences on archived ExecutionGraphs

The previously introduced SoftReferences to store archived execution graphs 
cleared old graphs in a non-transparent order. This pull requests removes the 
SoftReferences and reverts back to keeping a fixed-sized list of old execution 
graphs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-1843

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #639


commit a580c8973ccb3579c79ebd0dc860c1f754eb87fd
Author: Maximilian Michels m...@apache.org
Date:   2015-04-29T10:34:31Z

[FLINK-1843] remove SoftReferences on archived ExecutionGraphs

The previously introduced SoftReferences to store archived
ExecutionGraphs cleared old graphs in a non-transparent order.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29432849
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final DequeTaskRequirement taskRequirements = new 
ArrayDequeTaskRequirement();
+
+   private final MapIntermediateResultPartitionID, Boolean 
visitedPartitions = new HashMapIntermediateResultPartitionID, Boolean();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(CollectionExecutionJobVertex vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final DequeIntermediateResultPartition pendingInputs 
= new ArrayDequeIntermediateResultPartition();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs

  1   2   3   4   5   6   7   8   9   10   >