Repository: flink
Updated Branches:
  refs/heads/master 0ccbd2779 -> 0ccab95e7


[FLINK-2440][py] Expand Environment feature coverage

This closes #1383


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccab95e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccab95e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccab95e

Branch: refs/heads/master
Commit: 0ccab95e7fbbc53e94dabd1032d903428a47db24
Parents: 0ccbd27
Author: zentol <ches...@apache.org>
Authored: Thu Nov 19 13:30:48 2015 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Dec 2 14:40:38 2015 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      |  4 +--
 .../python/api/flink/example/TPCHQuery10.py     |  2 +-
 .../python/api/flink/example/TPCHQuery3.py      |  2 +-
 .../api/flink/example/TriangleEnumeration.py    |  2 +-
 .../python/api/flink/example/WebLogAnalysis.py  |  2 +-
 .../flink/python/api/flink/example/WordCount.py |  2 +-
 .../flink/python/api/flink/plan/Environment.py  | 38 ++++++++++++++------
 .../org/apache/flink/python/api/test_main.py    |  2 +-
 .../org/apache/flink/python/api/test_main2.py   |  3 +-
 9 files changed, 37 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index f4f501a..a27a589 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -278,9 +278,7 @@ public class PythonPlanBinder {
        }
 
        private void receiveParameters() throws IOException {
-               Integer parameterCount = (Integer) receiver.getRecord(true);
-
-               for (int x = 0; x < parameterCount; x++) {
+               for (int x = 0; x < 4; x++) {
                        Tuple value = (Tuple) receiver.getRecord(true);
                        switch (Parameters.valueOf(((String) 
value.getField(0)).toUpperCase())) {
                                case DOP:

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
index 032ef85..cc9e7cf 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py
@@ -110,6 +110,6 @@ if __name__ == "__main__":
 
        result.write_csv(sys.argv[5], '\n', '|', WriteMode.OVERWRITE)
 
-       env.set_degree_of_parallelism(1)
+       env.set_parallelism(1)
 
        env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
index 5fafb01..3eb72c9 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py
@@ -99,7 +99,7 @@ if __name__ == "__main__":
 
     result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE)
 
-    env.set_degree_of_parallelism(1)
+    env.set_parallelism(1)
 
     env.execute(local=True)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
index 2727635..b1b3ef4 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py
@@ -147,6 +147,6 @@ if __name__ == "__main__":
 
     triangles.output()
 
-    env.set_degree_of_parallelism(1)
+    env.set_parallelism(1)
 
     env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
index d571cf9..676043f 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py
@@ -82,6 +82,6 @@ if __name__ == "__main__":
 
     result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE)
 
-    env.set_degree_of_parallelism(1)
+    env.set_parallelism(1)
 
     env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
index 8a89a6f..71c2e28 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py
@@ -56,6 +56,6 @@ if __name__ == "__main__":
     else:
         result.output()
 
-    env.set_degree_of_parallelism(1)
+    env.set_parallelism(1)
 
     env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index bea6212..169e31b 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -40,7 +40,10 @@ class Environment(object):
         self._counter = 0
 
         #parameters
-        self._parameters = []
+        self._dop = -1
+        self._local_mode = False
+        self._debug_mode = False
+        self._retry = 0
 
         #sets
         self._sources = []
@@ -114,15 +117,28 @@ class Environment(object):
         self._sources.append(child)
         return child_set
 
-    def set_degree_of_parallelism(self, degree):
+    def set_parallelism(self, parallelism):
         """
-        Sets the degree of parallelism (DOP) for operations executed through 
this environment.
+        Sets the parallelism for operations executed through this environment.
 
         Setting a DOP of x here will cause all operators (such as join, map, 
reduce) to run with x parallel instances.
 
-        :param degreeOfParallelism: The degree of parallelism
+        :param parallelism: The degree of parallelism
         """
-        self._parameters.append(("dop", degree))
+        self._dop = parallelism
+
+    def get_parallelism(self):
+        """
+        Gets the parallelism with which operation are executed by default.
+        :return The parallelism used by operations.
+        """
+        return self._dop
+
+    def set_number_of_execution_retries(self, count):
+        self._retry = count
+
+    def get_number_of_execution_retries(self):
+        return self._retry
 
     def execute(self, local=False, debug=False):
         """
@@ -132,8 +148,8 @@ class Environment(object):
         """
         if debug:
             local = True
-        self._parameters.append(("mode", local))
-        self._parameters.append(("debug", debug))
+        self._local_mode = local
+        self._debug_mode = debug
         self._optimize_plan()
 
         plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
@@ -243,9 +259,11 @@ class Environment(object):
         self._send_broadcast()
 
     def _send_parameters(self):
-        self._collector.collect(len(self._parameters))
-        for parameter in self._parameters:
-            self._collector.collect(parameter)
+        collect = self._collector.collect
+        collect(("dop", self._dop))
+        collect(("debug", self._debug_mode))
+        collect(("mode", self._local_mode))
+        collect(("retry", self._retry))
 
     def _send_sources(self):
         for source in self._sources:

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 9a3a5e4..16e1a8c 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -227,6 +227,6 @@ if __name__ == "__main__":
         .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], 
"ChainedSortedGroupReduce"), STRING).output()
 
     #Execution
-    env.set_degree_of_parallelism(1)
+    env.set_parallelism(1)
 
     env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index 2f30cda..56e3250 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -1,3 +1,4 @@
+
 # 
###############################################################################
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -140,6 +141,6 @@ if __name__ == "__main__":
         .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", 
False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", 
True), (2, 0.4, "world", False)], "Union"), STRING).output()
 
     #Execution
-    env.set_degree_of_parallelism(1)
+    env.set_parallelism(1)
 
     env.execute(local=True)

Reply via email to