godfreyhe commented on a change in pull request #15231:
URL: https://github.com/apache/flink/pull/15231#discussion_r596514471



##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test PartitionSpec json ser/de. */

Review comment:
       PartitionSpec -> SortSpec

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/VariableRankRange.java
##########
@@ -18,18 +18,30 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.List;
 
 /** changing rank limit depends on input. */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class VariableRankRange implements RankRange {
 
+    public static final String FIELD_NAME_END_INDEX = "endIndex";
+
     private static final long serialVersionUID = 5579785886506433955L;
+
+    @JsonProperty(FIELD_NAME_END_INDEX)
     private int rankEndIndex;

Review comment:
       nit: mark this as `final`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSortLimit.java
##########
@@ -44,18 +53,41 @@ public StreamExecSortLimit(
             InputProperty inputProperty,
             RowType outputType,
             String description) {
+        this(
+                sortSpec,
+                new ConstantRankRange(limitStart + 1, limitEnd),
+                rankStrategy,
+                generateUpdateBefore,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @JsonCreator
+    public StreamExecSortLimit(
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_RANK_RANG) ConstantRankRange rankRange,
+            @JsonProperty(FIELD_NAME_RANK_STRATEGY) RankProcessStrategy 
rankStrategy,
+            @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean 
generateUpdateBefore,
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperty,

Review comment:
       inputProperties

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -89,7 +116,34 @@ public StreamExecRank(
             InputProperty inputProperty,
             RowType outputType,
             String description) {
-        super(Collections.singletonList(inputProperty), outputType, 
description);
+        this(
+                rankType,
+                partitionSpec,
+                sortSpec,
+                rankRange,
+                rankStrategy,
+                outputRankNumber,
+                generateUpdateBefore,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @JsonCreator
+    public StreamExecRank(
+            @JsonProperty(FIELD_NAME_RANK_TYPE) RankType rankType,
+            @JsonProperty(FIELD_NAME_PARTITION_SPEC) PartitionSpec 
partitionSpec,
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_RANK_RANG) RankRange rankRange,
+            @JsonProperty(FIELD_NAME_RANK_STRATEGY) RankProcessStrategy 
rankStrategy,
+            @JsonProperty(FIELD_NAME_OUTPUT_RANK_NUMBER) boolean 
outputRankNumber,
+            @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean 
generateUpdateBefore,
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperty,

Review comment:
       inputProperties

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/ConstantRankRange.java
##########
@@ -18,24 +18,42 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.List;
 
 /** rankStart and rankEnd are inclusive, rankStart always start from one. */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ConstantRankRange implements RankRange {
 
+    public static final String FIELD_NAME_START = "start";
+    public static final String FIELD_NAME_END = "end";
+
     private static final long serialVersionUID = 9062345289888078376L;
+
+    @JsonProperty(FIELD_NAME_START)
     private long rankStart;

Review comment:
       nit: mark this as `final`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -89,7 +116,34 @@ public StreamExecRank(
             InputProperty inputProperty,
             RowType outputType,
             String description) {
-        super(Collections.singletonList(inputProperty), outputType, 
description);
+        this(
+                rankType,
+                partitionSpec,
+                sortSpec,
+                rankRange,
+                rankStrategy,
+                outputRankNumber,
+                generateUpdateBefore,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @JsonCreator
+    public StreamExecRank(
+            @JsonProperty(FIELD_NAME_RANK_TYPE) RankType rankType,
+            @JsonProperty(FIELD_NAME_PARTITION_SPEC) PartitionSpec 
partitionSpec,
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_RANK_RANG) RankRange rankRange,
+            @JsonProperty(FIELD_NAME_RANK_STRATEGY) RankProcessStrategy 
rankStrategy,
+            @JsonProperty(FIELD_NAME_OUTPUT_RANK_NUMBER) boolean 
outputRankNumber,
+            @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean 
generateUpdateBefore,
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperty,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, inputProperty, outputType, description);

Review comment:
       nit: it's better we can add some validation for the given arguments, 
because it's created from json(may be changed by user), and the arguments may 
be illegal. such as check whether the argument is null, the length 
inputProperties is 1

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/** Test for sort limit JsonPlan ser/de. */

Review comment:
       update the java doc

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test RankType json ser/de. */

Review comment:
       update the java doc

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/ConstantRankRange.java
##########
@@ -18,24 +18,42 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.List;
 
 /** rankStart and rankEnd are inclusive, rankStart always start from one. */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ConstantRankRange implements RankRange {
 
+    public static final String FIELD_NAME_START = "start";
+    public static final String FIELD_NAME_END = "end";
+
     private static final long serialVersionUID = 9062345289888078376L;
+
+    @JsonProperty(FIELD_NAME_START)
     private long rankStart;
+
+    @JsonProperty(FIELD_NAME_END)
     private long rankEnd;

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java
##########
@@ -37,8 +44,26 @@
 import java.util.Set;
 
 /** Base class of Strategy to choose different rank process function. */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
+@JsonSubTypes(
+        value = {
+            @JsonSubTypes.Type(
+                    value = RankProcessStrategy.UndefinedStrategy.class,
+                    name = "Undefined"),
+            @JsonSubTypes.Type(
+                    value = RankProcessStrategy.AppendFastStrategy.class,
+                    name = "AppendFast"),
+            @JsonSubTypes.Type(value = 
RankProcessStrategy.RetractStrategy.class, name = "Retract"),
+            @JsonSubTypes.Type(
+                    value = RankProcessStrategy.UpdateFastStrategy.class,
+                    name = "UpdateFast"),
+        })
 public interface RankProcessStrategy {
 
+    String FIELD_NAME_TYPE = "type";
+
+    String getType();

Review comment:
       we do not need to add this method, you can define `@JsonTypeName` at the 
specific sub-class, please refer to `PlannerWindowProperty` and 
`PlannerProctimeAttribute`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
+import 
org.apache.flink.table.runtime.operators.rank.ConstantRankRangeWithoutEnd;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test RankType json ser/de. */

Review comment:
       update the java doc

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RankRange.java
##########
@@ -18,14 +18,30 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 import java.io.Serializable;
 import java.util.List;
 
 /**
  * RankRange for Rank, including following 3 types : ConstantRankRange, 
ConstantRankRangeWithoutEnd,
  * VariableRankRange.
  */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
+@JsonSubTypes(
+        value = {
+            @JsonSubTypes.Type(value = ConstantRankRange.class, name = 
"Constant"),
+            @JsonSubTypes.Type(
+                    value = ConstantRankRangeWithoutEnd.class,
+                    name = "ConstantWithoutEnd"),
+            @JsonSubTypes.Type(value = VariableRankRange.class, name = 
"Variable"),
+        })
 public interface RankRange extends Serializable {
 
+    String FIELD_NAME_TYPE = "type";
+
     String toString(List<String> inputFieldNames);
+
+    String getType();

Review comment:
       similar with RankProcessStrategy

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLimit.java
##########
@@ -44,18 +53,38 @@ public StreamExecLimit(
             InputProperty inputProperty,
             RowType outputType,
             String description) {
+        this(
+                new ConstantRankRange(limitStart + 1, limitEnd),
+                getRankStrategy(needRetraction),
+                generateUpdateBefore,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @JsonCreator
+    public StreamExecLimit(
+            @JsonProperty(FIELD_NAME_RANK_RANG) ConstantRankRange rankRange,
+            @JsonProperty(FIELD_NAME_RANK_STRATEGY) RankProcessStrategy 
rankStrategy,
+            @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean 
generateUpdateBefore,
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperty,

Review comment:
       inputProperties




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to