HuangXingBo commented on a change in pull request #15623:
URL: https://github.com/apache/flink/pull/15623#discussion_r613999180



##########
File path: docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
##########
@@ -240,6 +248,54 @@ object ExampleCountWindowAverage extends App {
 }
 ```
 {{< /tab >}}
+
+{{< tab "Python" >}}
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, 
RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+
+class CountWindowAverage(FlatMapFunction):
+
+    def __init__(self):
+        self.sum = None
+
+    def open(self, runtime_context: RuntimeContext):
+        descriptor = ValueStateDescriptor(
+            "average",  # the state name
+            Types.TUPLE([Types.LONG(), Types.LONG()])  # type information
+        )
+        self.sum = runtime_context.get_state(descriptor)
+
+    def flat_map(self, value):
+        # access the state value
+        current_sum = self.sum.value()
+        if current_sum is None:
+            current_sum = (0, 0)
+
+        # update the count
+        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
+
+        # update the state
+        self.sum.update(current_sum)
+
+        # if the count reaches 2, emit the average and clear the state
+        if current_sum[0] >= 2:
+            self.sum.clear()
+            yield value[0], current_sum[1] / current_sum[0]
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.from_collection([(1, 3), (1, 5), (1, 7), (1, 4), (1, 2)]) \
+    .key_by(lambda row: row[0]) \
+    .flat_map(CountWindowAverage()) \
+    .print()
+
+env.execute()
+
+# the printed output will be (1,4) and (1,5)

Review comment:
       ```suggestion
   # the printed output will be (1,4.0) and (1,5.0)
   ```

##########
File path: docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
##########
@@ -92,7 +100,7 @@ have potentially less overhead at runtime.
 ## 使用 Keyed State
 
 keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream`
-上使用,可以通过 `stream.keyBy(...)` 得到 `KeyedStream`.
+上使用,在Java API上可以通过 `stream.keyBy(...)` 得到 `KeyedStream`,在Python API上可以通过 
`stream.key_by(...)` 得到 `KeyedStream`。

Review comment:
       ```suggestion
   上使用,在Java/Scala API上可以通过 `stream.keyBy(...)` 得到 `KeyedStream`,在Python 
API上可以通过 `stream.key_by(...)` 得到 `KeyedStream`。
   ```

##########
File path: docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
##########
@@ -76,12 +76,20 @@ val words: DataStream[WC] = // [...]
 val keyed = words.keyBy( _.word )
 ```
 {{< /tab >}}
+
+{{< tab "Python" >}}
+```python
+words = # type: DataStream[Row]
+keyed = words.key_by(lambda row: row[0])
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 #### Tuple Keys and Expression Keys
 
 Flink also has two alternative ways of defining keys: tuple keys and expression
-keys. With this you can specify keys using tuple field indices or expressions
+keys in the Java API(still not supported in the Python API). With this you can

Review comment:
       ```suggestion
   keys in the Java/Scala API(still not supported in the Python API). With this 
you can
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/state.md
##########
@@ -93,7 +101,7 @@ have potentially less overhead at runtime.
 
 The keyed state interfaces provides access to different types of state that 
are all scoped to
 the key of the current input element. This means that this type of state can 
only be used
-on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java API or 
`stream.key_by(…)` in Python API.

Review comment:
       ```suggestion
   on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java/Scala 
API or `stream.key_by(…)` in Python API.
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/state.md
##########
@@ -260,6 +268,54 @@ object ExampleCountWindowAverage extends App {
 }
 ```
 {{< /tab >}}
+
+{{< tab "Python" >}}
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, 
RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+
+class CountWindowAverage(FlatMapFunction):
+
+    def __init__(self):
+        self.sum = None
+
+    def open(self, runtime_context: RuntimeContext):
+        descriptor = ValueStateDescriptor(
+            "average",  # the state name
+            Types.TUPLE([Types.LONG(), Types.LONG()])  # type information
+        )
+        self.sum = runtime_context.get_state(descriptor)
+
+    def flat_map(self, value):
+        # access the state value
+        current_sum = self.sum.value()
+        if current_sum is None:
+            current_sum = (0, 0)
+
+        # update the count
+        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
+
+        # update the state
+        self.sum.update(current_sum)
+
+        # if the count reaches 2, emit the average and clear the state
+        if current_sum[0] >= 2:
+            self.sum.clear()
+            yield value[0], current_sum[1] / current_sum[0]
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.from_collection([(1, 3), (1, 5), (1, 7), (1, 4), (1, 2)]) \
+    .key_by(lambda row: row[0]) \
+    .flat_map(CountWindowAverage()) \
+    .print()
+
+env.execute()
+
+# the printed output will be (1,4) and (1,5)

Review comment:
       ```suggestion
   # the printed output will be (1,4.0) and (1,5.0)
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/state.md
##########
@@ -76,12 +76,20 @@ val words: DataStream[WC] = // [...]
 val keyed = words.keyBy( _.word )
 ```
 {{< /tab >}}
+
+{{< tab "Python" >}}
+```python
+words = # type: DataStream[Row]
+keyed = words.key_by(lambda row: row[0])
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 #### Tuple Keys and Expression Keys
 
 Flink also has two alternative ways of defining keys: tuple keys and expression
-keys. With this you can specify keys using tuple field indices or expressions
+keys in the Java API(still not supported in the Python API). With this you can

Review comment:
       ```suggestion
   keys in the Java/Scala API(still not supported in the Python API). With this 
you can
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to