[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock

2024-02-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34544:
---
Labels: pull-request-available  (was: )

> Tiered result partition should be released with lock
> 
>
> Key: FLINK-34544
> URL: https://issues.apache.org/jira/browse/FLINK-34544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> When releasing `TieredResultPartition`, it is not protected by a lock, then 
> it may throw iIlegal state exception occasionally. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34544][network] Add lock to release tiered result partition [flink]

2024-02-28 Thread via GitHub


TanYuxin-tyx opened a new pull request, #24412:
URL: https://github.com/apache/flink/pull/24412

   
   
   ## What is the purpose of the change
   
   *Add lock to release tiered result partition*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Add lock to release tiered result partition*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*TieredResultPartitionTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock

2024-02-28 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-34544:
--
Description: When releasing `TieredResultPartition`, it is not protected by 
a lock, then it may throw iIlegal state exception occasionally. We should fix 
it.  (was: When releasing `TieredResultPartition`, it is not protected by a 
lock, then it may thrown iIlegal state exception occasionally. We should fix 
it.)

> Tiered result partition should be released with lock
> 
>
> Key: FLINK-34544
> URL: https://issues.apache.org/jira/browse/FLINK-34544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>
> When releasing `TieredResultPartition`, it is not protected by a lock, then 
> it may throw iIlegal state exception occasionally. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock

2024-02-28 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-34544:
--
Description: When releasing `TieredResultPartition`, it is not protected by 
a lock, then it may thrown iIlegal state exception occasionally. We should fix 
it.  (was: The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` 
check when releasing resources. However, this check is performed in the task 
thread, while the buffer recycle may occur in the Netty thread. As a result, it 
may incorrectly throw an exception when the release is too quick for the 
vertex, which has almost no data.
We should fix it.)

> Tiered result partition should be released with lock
> 
>
> Key: FLINK-34544
> URL: https://issues.apache.org/jira/browse/FLINK-34544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>
> When releasing `TieredResultPartition`, it is not protected by a lock, then 
> it may thrown iIlegal state exception occasionally. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock

2024-02-28 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-34544:
--
Summary: Tiered result partition should be released with lock  (was: The 
release check bug in tiered memory manager of hybrid shuffle)

> Tiered result partition should be released with lock
> 
>
> Key: FLINK-34544
> URL: https://issues.apache.org/jira/browse/FLINK-34544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>
> The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when 
> releasing resources. However, this check is performed in the task thread, 
> while the buffer recycle may occur in the Netty thread. As a result, it may 
> incorrectly throw an exception when the release is too quick for the vertex, 
> which has almost no data.
> We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1507136138


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java:
##
@@ -368,6 +369,14 @@ public static InputTypeStrategy 
commonMultipleArrayType(int minCount) {
 /** @see ItemAtIndexArgumentTypeStrategy */
 public static final ArgumentTypeStrategy ITEM_AT_INDEX = new 
ItemAtIndexArgumentTypeStrategy();
 
+/**
+ * An {@link InputTypeStrategy} that expects {@code count} arguments that 
have a common map
+ * type.
+ */
+public static InputTypeStrategy commonMapType(int count) {

Review Comment:
   change it to minCount.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34469][table] Implement TableDistribution toString [flink]

2024-02-28 Thread via GitHub


jeyhunkarimov commented on code in PR #24338:
URL: https://github.com/apache/flink/pull/24338#discussion_r1507135704


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableDistribution.java:
##
@@ -142,4 +142,9 @@ && getBucketCount().get() != 0) {
 sb.append("\n");
 return sb.toString();
 }
+
+@Override
+public String toString() {
+return asSerializableString();

Review Comment:
   Done. But IMO `asSerializableString()` should be public as it is everywhere 
in the codebase. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1507132219


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator;
+private transient MethodHandle keyEqualityHandle;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+final DataType keyDataType = outputType.getKeyDataType();
+final DataType valueDataType = outputType.getValueDataType();
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+keyEqualityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
keyDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
keyDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+keyEqualityHandle = keyEqualityEvaluator.open(context);
+}
+
+public @Nullable MapData eval(@Nullable MapData... maps) {
+try {
+if (maps == null || maps.length == 0) {
+return null;
+}
+if (maps.length == 1) {
+return maps[0];
+}
+MapData result = maps[0];
+for (int i = 1; i < maps.length; ++i) {
+MapData map = maps[i];
+if (map != null && map.size() > 0) {
+result = new MapDataForMapUnion(result, map);
+}
+}
+return result;
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class MapDataForMapUnion implements MapData {
+private final GenericArrayData keysArray;
+private final GenericArrayData valuesArray;
+
+public MapDataForMapUnion(MapData map1, MapData map2) throws Throwable 
{
+List keysList = new ArrayList<>();
+List valuesList = new ArrayList<>();
+boolean isKeyNullExist = false;
+for (int i = 0; i < map2.size(); i++) {
+Object key = 
keyElementGetter.getElementOrNull(map2.keyArray(), i);
+if (key == null) {
+isKeyNullExist = true;
+}
+keysList.add(key);
+
valuesList.add(valueElementGetter.getElementOrNull(map2.valueArray(), i));
+}
+
+ 

Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1507129381


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator;
+private transient MethodHandle keyEqualityHandle;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+final DataType keyDataType = outputType.getKeyDataType();
+final DataType valueDataType = outputType.getValueDataType();
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+keyEqualityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
keyDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
keyDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+keyEqualityHandle = keyEqualityEvaluator.open(context);
+}
+
+public @Nullable MapData eval(@Nullable MapData... maps) {
+try {
+if (maps == null || maps.length == 0) {
+return null;
+}
+if (maps.length == 1) {
+return maps[0];
+}
+MapData result = maps[0];
+for (int i = 1; i < maps.length; ++i) {
+MapData map = maps[i];
+if (map != null && map.size() > 0) {
+result = new MapDataForMapUnion(result, map);
+}
+}
+return result;
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class MapDataForMapUnion implements MapData {
+private final GenericArrayData keysArray;
+private final GenericArrayData valuesArray;
+
+public MapDataForMapUnion(MapData map1, MapData map2) throws Throwable 
{
+List keysList = new ArrayList<>();
+List valuesList = new ArrayList<>();
+boolean isKeyNullExist = false;
+for (int i = 0; i < map2.size(); i++) {
+Object key = 
keyElementGetter.getElementOrNull(map2.keyArray(), i);
+if (key == null) {
+isKeyNullExist = true;
+}
+keysList.add(key);
+
valuesList.add(valueElementGetter.getElementOrNull(map2.valueArray(), i));
+}
+
+ 

Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1507128713


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator;
+private transient MethodHandle keyEqualityHandle;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+final DataType keyDataType = outputType.getKeyDataType();
+final DataType valueDataType = outputType.getValueDataType();
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+keyEqualityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
keyDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
keyDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+keyEqualityHandle = keyEqualityEvaluator.open(context);
+}
+
+public @Nullable MapData eval(@Nullable MapData... maps) {
+try {
+if (maps == null || maps.length == 0) {
+return null;
+}
+if (maps.length == 1) {
+return maps[0];
+}
+MapData result = maps[0];
+for (int i = 1; i < maps.length; ++i) {
+MapData map = maps[i];
+if (map != null && map.size() > 0) {
+result = new MapDataForMapUnion(result, map);
+}
+}
+return result;
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class MapDataForMapUnion implements MapData {
+private final GenericArrayData keysArray;
+private final GenericArrayData valuesArray;
+
+public MapDataForMapUnion(MapData map1, MapData map2) throws Throwable 
{
+List keysList = new ArrayList<>();
+List valuesList = new ArrayList<>();
+boolean isKeyNullExist = false;
+for (int i = 0; i < map2.size(); i++) {
+Object key = 
keyElementGetter.getElementOrNull(map2.keyArray(), i);

Review Comment:
   Ok, I will extracted it.



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software 

[jira] [Updated] (FLINK-34544) The release check bug in tiered memory manager of hybrid shuffle

2024-02-28 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-34544:
--
Affects Version/s: 1.20.0
   (was: 1.19.0)

> The release check bug in tiered memory manager of hybrid shuffle
> 
>
> Key: FLINK-34544
> URL: https://issues.apache.org/jira/browse/FLINK-34544
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>
> The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when 
> releasing resources. However, this check is performed in the task thread, 
> while the buffer recycle may occur in the Netty thread. As a result, it may 
> incorrectly throw an exception when the release is too quick for the vertex, 
> which has almost no data.
> We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]

2024-02-28 Thread via GitHub


flinkbot commented on PR #24411:
URL: https://github.com/apache/flink/pull/24411#issuecomment-1970559712

   
   ## CI report:
   
   * 267b77b225bf82d8aa69a93a99627e337638ea66 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34535) Support JobPlanInfo for the explain result

2024-02-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34535:
---
Labels: pull-request-available  (was: )

> Support JobPlanInfo for the explain result
> --
>
> Key: FLINK-34535
> URL: https://issues.apache.org/jira/browse/FLINK-34535
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: yuanfenghu
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink Sql Explain syntax, we can set ExplainDetails to plan 
> JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this 
> part of the information, referring to JobPlanInfo, I can combine it with the 
> parameter `pipeline.jobvertex-parallelism-overrides` to set up my task 
> parallelism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]

2024-02-28 Thread via GitHub


huyuanfeng2018 opened a new pull request, #24411:
URL: https://github.com/apache/flink/pull/24411

   
   
   ## What is the purpose of the change
   
   This PR is used to display the job json plan information of the task when 
flinksql explain is supported
   
   ## Brief change log
   
   *(for example:)*
 - Add an enum JSON_JOB_PLAN to ExplainDetail
 - Update all implementations of Planner#explain including streaming and 
batch
 - Append the job plan in json format to the result when Planner#explain 
execute with parameter ExplainDetail.JSON_JOB_PLAN.

   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-02-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821972#comment-17821972
 ] 

Matthias Pohl commented on FLINK-29114:
---

* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57940=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=11858
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57956=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=11598
* 
https://github.com/apache/flink/actions/runs/8079627963/job/22074788185#step:10:11525
* 
https://github.com/apache/flink/actions/runs/8081916042/job/22082067075#step:10:11603
* 
https://github.com/apache/flink/actions/runs/8089966279/job/22107054311#step:10:11704

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> 

[jira] [Updated] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure

2024-02-28 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-26515:
--
Priority: Critical  (was: Minor)

> RetryingExecutorTest. testDiscardOnTimeout failed on azure
> --
>
> Key: FLINK-26515
> URL: https://issues.apache.org/jira/browse/FLINK-26515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 1.941 s <<< FAILURE! - in 
> org.apache.flink.changelog.fs.RetryingExecutorTest
> Mar 06 01:20:29 [ERROR] testTimeout  Time elapsed: 1.934 s  <<< FAILURE!
> Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but 
> was:<1922.869766>
> Mar 06 01:20:29   at org.junit.Assert.fail(Assert.java:89)
> Mar 06 01:20:29   at org.junit.Assert.failNotEquals(Assert.java:835)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:555)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:685)
> Mar 06 01:20:29   at 
> org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145)
> Mar 06 01:20:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 01:20:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 01:20:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 01:20:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Mar 06 01:20:29   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> Mar 06 01:20:29   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 06 01:20:29   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 06 01:20:29   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Mar 06 01:20:29   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure

2024-02-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821971#comment-17821971
 ] 

Matthias Pohl commented on FLINK-26515:
---

https://github.com/apache/flink/actions/runs/8089966419/job/22106994368#step:10:10565

> RetryingExecutorTest. testDiscardOnTimeout failed on azure
> --
>
> Key: FLINK-26515
> URL: https://issues.apache.org/jira/browse/FLINK-26515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 1.941 s <<< FAILURE! - in 
> org.apache.flink.changelog.fs.RetryingExecutorTest
> Mar 06 01:20:29 [ERROR] testTimeout  Time elapsed: 1.934 s  <<< FAILURE!
> Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but 
> was:<1922.869766>
> Mar 06 01:20:29   at org.junit.Assert.fail(Assert.java:89)
> Mar 06 01:20:29   at org.junit.Assert.failNotEquals(Assert.java:835)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:555)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:685)
> Mar 06 01:20:29   at 
> org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145)
> Mar 06 01:20:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 01:20:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 01:20:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 01:20:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Mar 06 01:20:29   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> Mar 06 01:20:29   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 06 01:20:29   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 06 01:20:29   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Mar 06 01:20:29   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2024-02-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821970#comment-17821970
 ] 

Matthias Pohl commented on FLINK-31472:
---

* 
https://github.com/apache/flink/actions/runs/8079627963/job/22074788571#step:10:10480
* 
https://github.com/apache/flink/actions/runs/8089966279/job/22106940120#step:10:10483

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> 
>
> Key: FLINK-31472
> URL: https://issues.apache.org/jira/browse/FLINK-31472
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.16.1, 1.18.0, 1.19.0, 1.20.0
>Reporter: Ran Tao
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>         at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>         at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>         at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>         at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>         at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
>         at 
> 

[jira] [Commented] (FLINK-26644) python StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies failed on azure

2024-02-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821968#comment-17821968
 ] 

Matthias Pohl commented on FLINK-26644:
---

https://github.com/apache/flink/actions/runs/8081916042/job/22082066739#step:10:24325

> python 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies 
> failed on azure
> ---
>
> Key: FLINK-26644
> URL: https://issues.apache.org/jira/browse/FLINK-26644
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.14.4, 1.15.0, 1.16.0, 1.19.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2022-03-14T18:50:24.6842853Z Mar 14 18:50:24 
> === FAILURES 
> ===
> 2022-03-14T18:50:24.6844089Z Mar 14 18:50:24 _ 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies _
> 2022-03-14T18:50:24.6844846Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6846063Z Mar 14 18:50:24 self = 
>   testMethod=test_generate_stream_graph_with_dependencies>
> 2022-03-14T18:50:24.6847104Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6847766Z Mar 14 18:50:24 def 
> test_generate_stream_graph_with_dependencies(self):
> 2022-03-14T18:50:24.6848677Z Mar 14 18:50:24 python_file_dir = 
> os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6849833Z Mar 14 18:50:24 os.mkdir(python_file_dir)
> 2022-03-14T18:50:24.6850729Z Mar 14 18:50:24 python_file_path = 
> os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
> 2022-03-14T18:50:24.6852679Z Mar 14 18:50:24 with 
> open(python_file_path, 'w') as f:
> 2022-03-14T18:50:24.6853646Z Mar 14 18:50:24 f.write("def 
> add_two(a):\nreturn a + 2")
> 2022-03-14T18:50:24.6854394Z Mar 14 18:50:24 env = self.env
> 2022-03-14T18:50:24.6855019Z Mar 14 18:50:24 
> env.add_python_file(python_file_path)
> 2022-03-14T18:50:24.6855519Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6856254Z Mar 14 18:50:24 def plus_two_map(value):
> 2022-03-14T18:50:24.6857045Z Mar 14 18:50:24 from 
> test_stream_dependency_manage_lib import add_two
> 2022-03-14T18:50:24.6857865Z Mar 14 18:50:24 return value[0], 
> add_two(value[1])
> 2022-03-14T18:50:24.6858466Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6858924Z Mar 14 18:50:24 def add_from_file(i):
> 2022-03-14T18:50:24.6859806Z Mar 14 18:50:24 with 
> open("data/data.txt", 'r') as f:
> 2022-03-14T18:50:24.6860266Z Mar 14 18:50:24 return i[0], 
> i[1] + int(f.read())
> 2022-03-14T18:50:24.6860879Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6862022Z Mar 14 18:50:24 from_collection_source = 
> env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1),
> 2022-03-14T18:50:24.6863259Z Mar 14 18:50:24  
>  ('e', 2)],
> 2022-03-14T18:50:24.6864057Z Mar 14 18:50:24  
> type_info=Types.ROW([Types.STRING(),
> 2022-03-14T18:50:24.6864651Z Mar 14 18:50:24  
>  Types.INT()]))
> 2022-03-14T18:50:24.6865150Z Mar 14 18:50:24 
> from_collection_source.name("From Collection")
> 2022-03-14T18:50:24.6866212Z Mar 14 18:50:24 keyed_stream = 
> from_collection_source.key_by(lambda x: x[1], key_type=Types.INT())
> 2022-03-14T18:50:24.6867083Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6867793Z Mar 14 18:50:24 plus_two_map_stream = 
> keyed_stream.map(plus_two_map).name("Plus Two Map").set_parallelism(3)
> 2022-03-14T18:50:24.6868620Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6869412Z Mar 14 18:50:24 add_from_file_map = 
> plus_two_map_stream.map(add_from_file).name("Add From File Map")
> 2022-03-14T18:50:24.6870239Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6870883Z Mar 14 18:50:24 test_stream_sink = 
> add_from_file_map.add_sink(self.test_sink).name("Test Sink")
> 2022-03-14T18:50:24.6871803Z Mar 14 18:50:24 
> test_stream_sink.set_parallelism(4)
> 2022-03-14T18:50:24.6872291Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6872756Z Mar 14 18:50:24 archive_dir_path = 
> os.path.join(self.tempdir, "archive_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6873557Z Mar 14 18:50:24 
> os.mkdir(archive_dir_path)
> 2022-03-14T18:50:24.6874817Z Mar 14 18:50:24 with 
> open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
> 2022-03-14T18:50:24.6875414Z Mar 14 18:50:24 f.write("3")
> 2022-03-14T18:50:24.6875906Z Mar 14 18:50:24 archive_file_path = \

[jira] [Updated] (FLINK-34449) Flink build took too long

2024-02-28 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34449:
--
Affects Version/s: 1.18.1

> Flink build took too long
> -
>
> Key: FLINK-34449
> URL: https://issues.apache.org/jira/browse/FLINK-34449
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.17.2, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We saw a timeout when building Flink in e2e1 stage. No logs are available to 
> investigate the issue:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57551=logs=bbb1e2a2-a43c-55c8-fb48-5cfe7a8a0ca6
> {code}
> Nothing to show. Final logs are missing. This can happen when the job is 
> cancelled or times out.
> {code}
> I'd consider this an infrastructure issue but created the Jira issue for 
> documentation purposes. Let's see whether that pops up again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34449) Flink build took too long

2024-02-28 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821967#comment-17821967
 ] 

Matthias Pohl commented on FLINK-34449:
---

This time, it happened in the caching step:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57957=logs=87489130-75dc-54e4-1f45-80c30aa367a3=5f5e3bcf-c82b-57ca-7f80-f293d0ad4448=1

> Flink build took too long
> -
>
> Key: FLINK-34449
> URL: https://issues.apache.org/jira/browse/FLINK-34449
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.17.2
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We saw a timeout when building Flink in e2e1 stage. No logs are available to 
> investigate the issue:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57551=logs=bbb1e2a2-a43c-55c8-fb48-5cfe7a8a0ca6
> {code}
> Nothing to show. Final logs are missing. This can happen when the job is 
> cancelled or times out.
> {code}
> I'd consider this an infrastructure issue but created the Jira issue for 
> documentation purposes. Let's see whether that pops up again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34499) Configuration#toString should hide sensitive values

2024-02-28 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34499.
---
Fix Version/s: 1.19.0
   1.17.3
   1.18.2
   (was: 1.20.0)
   Resolution: Fixed

master: 
[9e81177e44d63501b360b1a246a16ea3faecb548|https://github.com/apache/flink/commit/9e81177e44d63501b360b1a246a16ea3faecb548]
1.19: 
[5016325b6ec930a3f5cd3b186c185e004ede8691|https://github.com/apache/flink/commit/5016325b6ec930a3f5cd3b186c185e004ede8691]
1.18: 
[e770cef5d1df282fe28deb5f1309873b8342d046|https://github.com/apache/flink/commit/e770cef5d1df282fe28deb5f1309873b8342d046]
1.17: 
[72fbc89777286d9cda46a80231b2db11c21ced0d|https://github.com/apache/flink/commit/72fbc89777286d9cda46a80231b2db11c21ced0d]

> Configuration#toString should hide sensitive values
> ---
>
> Key: FLINK-34499
> URL: https://issues.apache.org/jira/browse/FLINK-34499
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> Time and time again people log the entire Flink configuration for no reason, 
> risking that sensitive values are logged in plain text.
> We should make this harder by changing {{Configuration#toString}} to 
> automatically hide sensitive values, for example like this:
> {code}
> @Override
> public String toString() {
> return ConfigurationUtils
> 
> .hideSensitiveValues(this.confData.entrySet().stream().collect(
> Collectors.toMap(Map.Entry::getKey, entry -> 
> entry.getValue().toString(
> .toString();
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.17][FLINK-34499] Configuration#toString hides sensitive values [flink]

2024-02-28 Thread via GitHub


XComp merged PR #24406:
URL: https://github.com/apache/flink/pull/24406


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-34499] Configuration#toString hides sensitive values [flink]

2024-02-28 Thread via GitHub


XComp merged PR #24405:
URL: https://github.com/apache/flink/pull/24405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-34499] Configuration#toString hides sensitive values [flink]

2024-02-28 Thread via GitHub


XComp merged PR #24404:
URL: https://github.com/apache/flink/pull/24404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34499] Configuration#toString hides sensitive values [flink]

2024-02-28 Thread via GitHub


XComp merged PR #24370:
URL: https://github.com/apache/flink/pull/24370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34498) GSFileSystemFactory logs full Flink config

2024-02-28 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34498:
--
Fix Version/s: (was: 1.20.0)

> GSFileSystemFactory logs full Flink config
> --
>
> Key: FLINK-34498
> URL: https://issues.apache.org/jira/browse/FLINK-34498
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.18.1
>Reporter: Chesnay Schepler
>Assignee: Jeyhun Karimov
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> This can cause secrets from the config to be logged.
> {code}
> @Override
> public void configure(Configuration flinkConfig) {
> LOGGER.info("Configuring GSFileSystemFactory with Flink configuration 
> {}", flinkConfig);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34498) GSFileSystemFactory logs full Flink config

2024-02-28 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34498.
---
Fix Version/s: (was: 1.18.2)
 Assignee: Jeyhun Karimov
   Resolution: Fixed

master: 
[8bb4457b6cf13d85639e914a8d959e39c2257d66|https://github.com/apache/flink/commit/8bb4457b6cf13d85639e914a8d959e39c2257d66]
1.19: 
[628ae78ab304a7543a8d5566a7dd6cbcb336d61b|https://github.com/apache/flink/commit/628ae78ab304a7543a8d5566a7dd6cbcb336d61b]

> GSFileSystemFactory logs full Flink config
> --
>
> Key: FLINK-34498
> URL: https://issues.apache.org/jira/browse/FLINK-34498
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.18.1
>Reporter: Chesnay Schepler
>Assignee: Jeyhun Karimov
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.20.0
>
>
> This can cause secrets from the config to be logged.
> {code}
> @Override
> public void configure(Configuration flinkConfig) {
> LOGGER.info("Configuring GSFileSystemFactory with Flink configuration 
> {}", flinkConfig);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.19][FLINK-34498] GSFileSystemFactory should not log full Flink config [flink]

2024-02-28 Thread via GitHub


XComp merged PR #24408:
URL: https://github.com/apache/flink/pull/24408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34498][filesystem] GSFileSystemFactory should not log full Flink log [flink]

2024-02-28 Thread via GitHub


XComp merged PR #24372:
URL: https://github.com/apache/flink/pull/24372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34544) The release check bug in tiered memory manager of hybrid shuffle

2024-02-28 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-34544:
-

 Summary: The release check bug in tiered memory manager of hybrid 
shuffle
 Key: FLINK-34544
 URL: https://issues.apache.org/jira/browse/FLINK-34544
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.19.0
Reporter: Yuxin Tan
Assignee: Yuxin Tan


The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when 
releasing resources. However, this check is performed in the task thread, while 
the buffer recycle may occur in the Netty thread. As a result, it may 
incorrectly throw an exception when the release is too quick for the vertex, 
which has almost no data.
We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-28 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821949#comment-17821949
 ] 

Benchao Li commented on FLINK-34529:


[~nilerzhou] Thanks for reporting this, it sounds good to me.

In the description, the expected plan seems already pushing the {{Calc}} 
through {{Rank}} (but not through {{Join}}), why is that?

Besides, I noted that there is a {{CalcRankTransposeRule}} in Flink already, 
why does it not work as expected?

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-02-28 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821947#comment-17821947
 ] 

Feng Jin commented on FLINK-29114:
--

[~qingyue] Thank you for your reply. Indeed, as you said, this is not an issue 
with unit testing but a problem with the generation of the staging directory.

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> 

[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-02-28 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821941#comment-17821941
 ] 

Jane Chan commented on FLINK-29114:
---

[~hackergin] Different sink paths could avoid the unstable case. However, the 
problem lies in the way of generating the staging dir path. It's unreliable to 
rely solely on the timestamp as a path postfix.

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> 

Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]

2024-02-28 Thread via GitHub


Zakelly commented on code in PR #24402:
URL: https://github.com/apache/flink/pull/24402#discussion_r1507058342


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -169,6 +173,26 @@ public class CheckpointingOptions {
 + "deactivated. Local recovery currently 
only covers keyed state backends "
 + "(including both the 
EmbeddedRocksDBStateBackend and the HashMapStateBackend).");
 
+/**
+ * This option configures local backup for the state backend, which 
indicates whether to make
+ * backup checkpoint on local disk. If not configured, fallback to {@link
+ * StateRecoveryOptions#LOCAL_RECOVERY}. By default, local backup is 
deactivated. Local backup
+ * currently only covers keyed state backends (including both the 
EmbeddedRocksDBStateBackend
+ * and the HashMapStateBackend).
+ */
+public static final ConfigOption LOCAL_BACKUP_ENABLED =
+ConfigOptions.key("execution.checkpointing.local-backup.enabled")
+.booleanType()
+
.defaultValue(StateRecoveryOptions.LOCAL_RECOVERY.defaultValue())
+
.withFallbackKeys(StateRecoveryOptions.LOCAL_RECOVERY.key())

Review Comment:
   I'm thinking does it make sense to make `'LOCAL_BACKUP_ENABLED'` as fallback 
keys of the new `'LOCAL_RECOVERY'` instead of the reverse order? Since whether 
to recover from local is typically set aligned with whether to make local 
backup.
   And both options should have same deprecated keys 
'state.backend.local-recovery'.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalBackupAndRecoveryDirectoryProvider.java:
##
@@ -48,7 +48,7 @@
  *
  * 
  */
-public interface LocalRecoveryDirectoryProvider extends Serializable {
+public interface LocalBackupAndRecoveryDirectoryProvider extends Serializable {

Review Comment:
   I'd suggest name it 'LocalBackupDirectoryProvider' or keep the original 
name. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' [flink]

2024-02-28 Thread via GitHub


Zakelly commented on code in PR #24401:
URL: https://github.com/apache/flink/pull/24401#discussion_r1507047875


##
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##
@@ -76,6 +76,39 @@ public class CheckpointingOptions {
  * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} 
method is called.
  *
  * Recognized shortcut names are 'jobmanager' and 'filesystem'.
+ *
+ * {@link #CHECKPOINT_STORAGE} and {@link #CHECKPOINTS_DIRECTORY} are 
usually combined to
+ * configure the checkpoint location. The behaviors of different 
combinations are as follows:

Review Comment:
   Instead of combining these two options, I'd suggest introduce the behaviors 
when set CHECKPOINT_STORAGE='jobmanager' and CHECKPOINT_STORAGE='filesystem' 
seperately. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1970441742

   Hi @snuyanzin @XComp, thanks for your comments on the review. I updated the 
test (actually not perfect either..) and tried best to verify the case. Would 
you mind retaking a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]

2024-02-28 Thread via GitHub


hunter-cloud09 closed pull request #740: [FLINK-33930] Canceled stop job status 
exception
URL: https://github.com/apache/flink-kubernetes-operator/pull/740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]

2024-02-28 Thread via GitHub


hunter-cloud09 commented on PR #740:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/740#issuecomment-1970439822

   > Should we close this? @hunter-cloud09
   
   yep! If you do not want  do this. i solve this in my local.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34372][table-api] Support DESCRIBE CATALOG [flink]

2024-02-28 Thread via GitHub


hackergin commented on code in PR #24275:
URL: https://github.com/apache/flink/pull/24275#discussion_r1506965964


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java:
##
@@ -47,4 +51,31 @@ public String getName() {
 public String getDefaultDatabase() {
 return defaultDatabase;
 }
+
+@Override
+public String explainCatalog() {
+StringBuilder sb = new StringBuilder();
+Map extraExplainInfo = extraExplainInfo();
+sb.append("default database: ").append(defaultDatabase);
+if (!extraExplainInfo.isEmpty()) {
+sb.append("\n");
+sb.append(
+extraExplainInfo.entrySet().stream()
+.map(entry -> entry.getKey() + ": " + 
entry.getValue())
+.collect(Collectors.joining("\n")));
+}
+
+// put the class name at the end
+sb.append("\n").append(Catalog.super.explainCatalog());
+return sb.toString();
+}
+
+/**
+ * Extra explain information used to print by DESCRIBE CATALOG catalogName 
statement.
+ *
+ * Note: The class name and default database name of this catalog are 
no need to be added.
+ */
+protected Map extraExplainInfo() {

Review Comment:
   Can we directly retrieve parameter information from the catalog store 
instead of introducing a new interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1506962857


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator;
+private transient MethodHandle keyEqualityHandle;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+final DataType keyDataType = outputType.getKeyDataType();
+final DataType valueDataType = outputType.getValueDataType();
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+keyEqualityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
keyDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
keyDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+keyEqualityHandle = keyEqualityEvaluator.open(context);
+}
+
+public @Nullable MapData eval(@Nullable MapData... maps) {
+try {
+if (maps == null || maps.length == 0) {
+return null;
+}
+if (maps.length == 1) {
+return maps[0];
+}
+MapData result = maps[0];
+for (int i = 1; i < maps.length; ++i) {
+MapData map = maps[i];
+if (map != null && map.size() > 0) {

Review Comment:
   Yes, if map == null, we should return null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #22842:
URL: https://github.com/apache/flink/pull/22842#discussion_r1506962507


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */
+@Internal
+public class MapUnionFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter keyElementGetter;
+private final ArrayData.ElementGetter valueElementGetter;
+
+private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator;
+private transient MethodHandle keyEqualityHandle;
+
+public MapUnionFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.MAP_UNION, context);
+KeyValueDataType outputType =
+((KeyValueDataType) 
context.getCallContext().getOutputDataType().get());
+final DataType keyDataType = outputType.getKeyDataType();
+final DataType valueDataType = outputType.getValueDataType();
+keyElementGetter =
+
ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType());
+valueElementGetter =
+
ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType());
+keyEqualityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
keyDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
keyDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+keyEqualityHandle = keyEqualityEvaluator.open(context);
+}
+
+public @Nullable MapData eval(@Nullable MapData... maps) {
+try {
+if (maps == null || maps.length == 0) {
+return null;
+}
+if (maps.length == 1) {
+return maps[0];
+}
+MapData result = maps[0];

Review Comment:
   Ok, I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34522] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-28 Thread via GitHub


flinkbot commented on PR #24410:
URL: https://github.com/apache/flink/pull/24410#issuecomment-1970317964

   
   ## CI report:
   
   * f377be1a0e1e7006199f0bca31be1dd8cb330acd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34522] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-28 Thread via GitHub


qinf opened a new pull request, #24410:
URL: https://github.com/apache/flink/pull/24410

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34522) StateTtlConfig#cleanupInRocksdbCompactFilter still uses the deprecated Time class

2024-02-28 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821921#comment-17821921
 ] 

lincoln lee commented on FLINK-34522:
-

[~mapohl]  Thanks for resolving it!

Back to this issue, [~dianfu] & [~hxb] have made some attempts, but there're no 
clear leads yet, looks like it will take some time.

> StateTtlConfig#cleanupInRocksdbCompactFilter still uses the deprecated Time 
> class
> -
>
> Key: FLINK-34522
> URL: https://issues.apache.org/jira/browse/FLINK-34522
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.20.0
>
>
> FLINK-32570 deprecated the Time class and refactor all Public or 
> PublicEvolving apis to use the Java's Duration.
> StateTtlConfig.Builder#cleanupInRocksdbCompactFilter is still using the Time 
> class. In general, we expect:
>  * Mark {{cleanupInRocksdbCompactFilter(long, Time)}} as {{@Deprecated}}
>  * Provide a new cleanupInRocksdbCompactFilter(long, Duration)
> Note: This is exactly what FLINK-32570 does, so I guess FLINK-32570 missed 
> cleanupInRocksdbCompactFilter.
> But I found this method is introduced in 1.19(FLINK-30854), so a better 
> solution may be: only provide cleanupInRocksdbCompactFilter(long, Duration) 
> and don't use Time.
> The deprecated Api should be keep for 2 minor version. IIUC, we cannot remove 
> Time related class in Flink 2.0 if we don't deprecate it in 1.19. If so, I 
> think it's better to merge this JIRA in 1.19.0 as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34184) Update copyright and license file

2024-02-28 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-34184:
-

Assignee: Hang Ruan

> Update copyright and license file
> -
>
> Key: FLINK-34184
> URL: https://issues.apache.org/jira/browse/FLINK-34184
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: Leonard Xu
>Assignee: Hang Ruan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34183) Add NOTICE files for Flink CDC project

2024-02-28 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-34183:
-

Assignee: Hang Ruan

> Add NOTICE files for Flink CDC project
> --
>
> Key: FLINK-34183
> URL: https://issues.apache.org/jira/browse/FLINK-34183
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: Leonard Xu
>Assignee: Hang Ruan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34188) Setup release infrastructure for Flink CDC project

2024-02-28 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-34188:
-

Assignee: LvYanquan

> Setup release infrastructure for Flink CDC project
> --
>
> Key: FLINK-34188
> URL: https://issues.apache.org/jira/browse/FLINK-34188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: Leonard Xu
>Assignee: LvYanquan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-28 Thread via GitHub


JustinLeesin commented on code in PR #24397:
URL: https://github.com/apache/flink/pull/24397#discussion_r1506925414


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java:
##
@@ -138,6 +137,14 @@ private Object[] getConvertedArgumentValues(
 return argumentVal;
 }
 
+private ProcedureContext getProcedureContext(TableConfig tableConfig) {
+Configuration configuration = (Configuration) 
tableConfig.getRootConfiguration();

Review Comment:
   Yeh ,it's better, but the Configuration class doesn't provide a construct as 
Configuration(ReadableConfig config), it has to be : Configuration 
configuration = new 
Configuration((Configuration)tableConfig.getRootConfiguration());
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-28 Thread via GitHub


JustinLeesin commented on code in PR #24397:
URL: https://github.com/apache/flink/pull/24397#discussion_r1506923378


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java:
##
@@ -210,6 +214,31 @@ void testNamedArgumentsWithOptionalArguments() {
 ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING(;
 }
 
+@Test
+void testEnvironmentConf() throws DatabaseAlreadyExistException {
+// root conf should work
+Configuration configuration = new Configuration();
+configuration.setString("key1", "value1");
+StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+TestProcedureCatalogFactory.CatalogWithBuiltInProcedure 
procedureCatalog =
+new 
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+procedureCatalog.createDatabase(
+"system", new CatalogDatabaseImpl(Collections.emptyMap(), 
null), true);
+tableEnv.registerCatalog("test_p", procedureCatalog);
+tableEnv.useCatalog("test_p");

Review Comment:
   if also add a property with key1, it will also overwrite the same key again. 
Do you mean to add a property ("key2", "value22") to table config and then 
check? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-02-28 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821920#comment-17821920
 ] 

Feng Jin commented on FLINK-29114:
--

[~qingyue] Can we fix this issue by creating different Sink Tables? If so, I am 
willing to help fix it.

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at 
> 

Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #23173:
URL: https://github.com/apache/flink/pull/23173#discussion_r1506918553


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */
+@Internal
+public class ArrayExceptFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator;
+private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+private transient MethodHandle hashcodeHandle;
+
+private transient MethodHandle equalityHandle;
+
+public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+hashcodeEvaluator =
+context.createEvaluator(
+Expressions.call("$HASHCODE$1", $("element1")),
+DataTypes.INT(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()));
+equalityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+hashcodeHandle = hashcodeEvaluator.open(context);
+equalityHandle = equalityEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set seen = new HashSet<>();
+
+boolean isNullPresentInArrayTwo = false;
+if (arrayTwo != null) {
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+if (element == null) {
+isNullPresentInArrayTwo = true;
+} else {
+ObjectContainer objectContainer = new 
ObjectContainer(element);
+seen.add(objectContainer);
+}
+}
+}
+boolean isNullPresentInArrayOne = false;
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+if (element == null) {
+isNullPresentInArrayOne = true;
+} else {
+ObjectContainer 

[jira] [Created] (FLINK-34543) Support Full Partition Processing On Non-keyed DataStream

2024-02-28 Thread Wencong Liu (Jira)
Wencong Liu created FLINK-34543:
---

 Summary: Support Full Partition Processing On Non-keyed DataStream
 Key: FLINK-34543
 URL: https://issues.apache.org/jira/browse/FLINK-34543
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.20.0
Reporter: Wencong Liu
 Fix For: 1.20.0


1. Introduce MapParititon, SortPartition, Aggregate, Reduce API in DataStream.
2. Introduce SortPartition API in KeyedStream.

The related FLIP can be found in 
[FLIP-380|https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives

2024-02-28 Thread Lennon Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lennon Yu updated FLINK-34542:
--
Description: 
This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain \{ languageVersion = 
JavaLanguageVersion.of(17) } }
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.

  was:
This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * {{{}Add {{mergeServiceFiles(){} call to the {{shadowJar}} configuration 
block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain { languageVersion = 
JavaLanguageVersion.of(17) } }
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.


> Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
> ---
>
> Key: FLINK-34542
> URL: https://issues.apache.org/jira/browse/FLINK-34542
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Lennon Yu
>Priority: Minor
>
> This is a ticket of misc. improvements on the build.gradle script provided at 
> {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
> {{Getting Started:}}
>  * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block
>  ** Absence of this will cause class-not-found errors in SPI related class 
> loading if the user has multiple connectors/formats in their implementation.
>  * Move the top level {{mainClassName}} project property setting into 
> application \{ mainClass = 'foo.Bar' }
>  ** This is because the top-level mainClassName property will be deprecated 
> in Gradle 9.0+
>  * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
> properties with java \{ toolChain \{ languageVersion = 
> JavaLanguageVersion.of(17) } }
>  ** This is the recommended way by Gradle to streamline langauge version 
> configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives

2024-02-28 Thread Lennon Yu (Jira)
Lennon Yu created FLINK-34542:
-

 Summary: Improve Gradle Quick Start build.gradle with Better 
Gradle API Alternatives
 Key: FLINK-34542
 URL: https://issues.apache.org/jira/browse/FLINK-34542
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Lennon Yu


This is a ticket of misc. improvements on the build.gradle script provided at 
{{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> 
{{Getting Started:}}
 * {{{}Add {{mergeServiceFiles(){} call to the {{shadowJar}} configuration 
block
 ** Absence of this will cause class-not-found errors in SPI related class 
loading if the user has multiple connectors/formats in their implementation.
 * Move the top level {{mainClassName}} project property setting into 
application \{ mainClass = 'foo.Bar' }
 ** This is because the top-level mainClassName property will be deprecated in 
Gradle 9.0+
 * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} 
properties with java \{ toolChain { languageVersion = 
JavaLanguageVersion.of(17) } }
 ** This is the recommended way by Gradle to streamline langauge version 
configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][doc] Update the Kafka DDL example in the hive catalog page to the new parameter style [flink]

2024-02-28 Thread via GitHub


hackergin commented on PR #24409:
URL: https://github.com/apache/flink/pull/24409#issuecomment-1970258704

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][doc] Update the Kafka DDL example in the hive catalog page to the new parameter style [flink]

2024-02-28 Thread via GitHub


hackergin commented on PR #24409:
URL: https://github.com/apache/flink/pull/24409#issuecomment-1970258454

   CI failed because of https://issues.apache.org/jira/browse/FLINK-29114 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-02-28 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821914#comment-17821914
 ] 

Feng Jin commented on FLINK-29114:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57954=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> 

Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-02-28 Thread via GitHub


luoyuxia commented on code in PR #24397:
URL: https://github.com/apache/flink/pull/24397#discussion_r1506889418


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java:
##
@@ -210,6 +214,31 @@ void testNamedArgumentsWithOptionalArguments() {
 ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING(;
 }
 
+@Test
+void testEnvironmentConf() throws DatabaseAlreadyExistException {
+// root conf should work
+Configuration configuration = new Configuration();
+configuration.setString("key1", "value1");
+StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+TestProcedureCatalogFactory.CatalogWithBuiltInProcedure 
procedureCatalog =
+new 
TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
+procedureCatalog.createDatabase(
+"system", new CatalogDatabaseImpl(Collections.emptyMap(), 
null), true);
+tableEnv.registerCatalog("test_p", procedureCatalog);
+tableEnv.useCatalog("test_p");

Review Comment:
   Can you please also add a property ("key1, "value22") to table config , and 
then call the get_env_conf to make sure we can get both table configuration and 
root configuration



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java:
##
@@ -138,6 +137,14 @@ private Object[] getConvertedArgumentValues(
 return argumentVal;
 }
 
+private ProcedureContext getProcedureContext(TableConfig tableConfig) {
+Configuration configuration = (Configuration) 
tableConfig.getRootConfiguration();

Review Comment:
   nit:
   Configuration configuration = new 
Configuration(tableConfig.getRootConfiguration());



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

Migration Plan is documented here: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released as part of Flink 1.18.1, the Kryo library in 
> Flink 1.18.1 needs to be 

[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

Migration Plan is documented here: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

Migration Plan is documented here: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released 

[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

Migration Plan is documented here: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0]

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 and snapshot migration
https://issues.apache.org/jira/browse/FLINK-3154.

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

Migration Plan is documented here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 
https://issues.apache.org/jira/browse/FLINK-3154.


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released as part of Flink 1.18.1, the Kryo 

[jira] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34491 ]


Dhruv Patel deleted comment on FLINK-34491:
-

was (Author: JIRAUSER289387):
Following issue has been observed in after enabling SSL in flink. Since after 
migration flink uses tls1.3 as default
|Change |Description | |
|SSL / TLS v1.3|the handshake between the flink components now uses TLS v1.3 
with Cipher: 
TLS_AES_256_GCM_SHA384 
which is causing SSL handshake failures. 
 {code:java}
SSL3 alert read:fatal:handshake failure
SSL_connect:error in error
409B7454F87F:error:0A000410:SSL routines:ssl3_read_bytes:sslv3 alert 
handshake failure:ssl/record/rec_layer_s3.c:1586:SSL alert number 40
–
Server Temp Key: ECDH, prime256v1, 256 bits
—
SSL handshake has read 470 bytes and written 730 bytes
Verification: OK
—
New, TLSv1.3, Cipher is TLS_AES_256_GCM_SHA384
This TLS version forbids renegotiation.| {code}| |

> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released as part of Flink 1.18.1, the Kryo library in 
> Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 
> does not support Java 17. This improvement plan is tracked as part of this 
> ticket https://issues.apache.org/jira/browse/FLINK-3154.
> All Flink applications using protobuf currently generate state with Kryo v2. 
> Once the above improvement plan is complete all Flink applications will fully 
> support reading that state and write newer state with Kryo v5. However, 
> latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This 
> will prevent applications which are using snapshot mechanism to deploy their 
> jobs to latest Flink version with Kryo v5 support without a bridge version 
> running on Java 11. Applications will have to run on a bridge release version 
> that will read their state with Kryo v2 data and write it with Kryo v5 data 
> before upgrading to a future version of Flink that completely drops support 
> for Kryo v2.
> Basically, Flink applications using protobuf dataformat cannot move directly 
> from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
> Applications will need to first move to Java 11 (bridging version) and then 
> move to Java 17 to have a safe deployment.
> Migration Plan is documented here: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0
> *Blocker for this task:*
> Upgrade to Kryo 5.5.0 which supports Java 17 
> https://issues.apache.org/jira/browse/FLINK-3154.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. 
Applications will need to first move to Java 11 (bridging version) and then 
move to Java 17 to have a safe deployment.

Migration Plan is documented here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 
https://issues.apache.org/jira/browse/FLINK-3154.

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime. Applications will need to first move 
to Java 11 (bridging version) and then move to Java 17 to have a safe 
deployment.

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 
https://issues.apache.org/jira/browse/FLINK-3154.


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released as part of Flink 1.18.1, the Kryo library in 
> Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 
> does not support Java 17. This improvement plan is tracked as part of this 
> ticket 

[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

Basically, Flink applications using protobuf dataformat cannot move directly 
from Java 8 to Java 17 without downtime. Applications will need to first move 
to Java 11 (bridging version) and then move to Java 17 to have a safe 
deployment.

*Blocker for this task:*

Upgrade to Kryo 5.5.0 which supports Java 17 
https://issues.apache.org/jira/browse/FLINK-3154.

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

*Blocker for this task:*

Below are some of the blocker task which prevent certain Flink applications 
which use protobuf from upgrading to Java 17
*Blocker 1:* Upgrade to Kryo 5.5.0 which supports Java 17 
https://issues.apache.org/jira/browse/FLINK-3154.

*Blocker 2:* Validate whether a snapshot with Java 8 is compatible with Java 17 
without using a bridge version running on Java 11.
 https://issues.apache.org/jira/browse/FLINK-33707


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released as part of Flink 1.18.1, the Kryo library in 
> Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 
> does not support Java 17. This improvement plan is tracked as part of this 
> ticket https://issues.apache.org/jira/browse/FLINK-3154.
> All Flink applications using protobuf currently generate 

Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-28 Thread via GitHub


RocMarshal commented on PR #24407:
URL: https://github.com/apache/flink/pull/24407#issuecomment-1970101645

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]

2024-02-28 Thread via GitHub


RocMarshal commented on PR #24407:
URL: https://github.com/apache/flink/pull/24407#issuecomment-1970101117

   There's an unstable test case   which is nothing to  do  with the  current  
pr.
   So, I'll  trigger  the  next  run CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17

2024-02-28 Thread Dhruv Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruv Patel updated FLINK-34491:

Description: 
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

*Background:*

Flink supports protobuf dataformat to exchange messages between different 
operators and the serialization and deserialization of those protobufs are 
performed by library called "Kryo". In order to move away from experimental 
support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 
1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not 
support Java 17. This improvement plan is tracked as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-3154.

All Flink applications using protobuf currently generate state with Kryo v2. 
Once the above improvement plan is complete all Flink applications will fully 
support reading that state and write newer state with Kryo v5. However, latest 
Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent 
applications which are using snapshot mechanism to deploy their jobs to latest 
Flink version with Kryo v5 support without a bridge version running on Java 11. 
Applications will have to run on a bridge release version that will read their 
state with Kryo v2 data and write it with Kryo v5 data before upgrading to a 
future version of Flink that completely drops support for Kryo v2.

*Blocker for this task:*

Below are some of the blocker task which prevent certain Flink applications 
which use protobuf from upgrading to Java 17
*Blocker 1:* Upgrade to Kryo 5.5.0 which supports Java 17 
https://issues.apache.org/jira/browse/FLINK-3154.

*Blocker 2:* Validate whether a snapshot with Java 8 is compatible with Java 17 
without using a bridge version running on Java 11.
 https://issues.apache.org/jira/browse/FLINK-33707

  was:
This task is to move away from experimental support for Java 17 to production 
support so that teams running Flink in production can migrate to Java 17 
successfully

 

*Blocker for this task:*

Savepoint migration is not supported with Java 17 and Flink 1.18.1 as mentioned 
in this ticket https://issues.apache.org/jira/browse/FLINK-33707


> Move from experimental support to production support for Java 17
> 
>
> Key: FLINK-34491
> URL: https://issues.apache.org/jira/browse/FLINK-34491
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.1
>Reporter: Dhruv Patel
>Priority: Major
>
> This task is to move away from experimental support for Java 17 to production 
> support so that teams running Flink in production can migrate to Java 17 
> successfully
> *Background:*
> Flink supports protobuf dataformat to exchange messages between different 
> operators and the serialization and deserialization of those protobufs are 
> performed by library called "Kryo". In order to move away from experimental 
> support of Java 17 released as part of Flink 1.18.1, the Kryo library in 
> Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 
> does not support Java 17. This improvement plan is tracked as part of this 
> ticket https://issues.apache.org/jira/browse/FLINK-3154.
> All Flink applications using protobuf currently generate state with Kryo v2. 
> Once the above improvement plan is complete all Flink applications will fully 
> support reading that state and write newer state with Kryo v5. However, 
> latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This 
> will prevent applications which are using snapshot mechanism to deploy their 
> jobs to latest Flink version with Kryo v5 support without a bridge version 
> running on Java 11. Applications will have to run on a bridge release version 
> that will read their state with Kryo v2 data and write it with Kryo v5 data 
> before upgrading to a future version of Flink that completely drops support 
> for Kryo v2.
> *Blocker for this task:*
> Below are some of the blocker task which prevent certain Flink applications 
> which use protobuf from upgrading to Java 17
> *Blocker 1:* Upgrade to Kryo 5.5.0 which supports Java 17 
> https://issues.apache.org/jira/browse/FLINK-3154.
> *Blocker 2:* Validate whether a snapshot with Java 8 is compatible with Java 
> 17 without using a bridge version running on Java 11.
>  https://issues.apache.org/jira/browse/FLINK-33707



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build

2024-02-28 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34541:
---
Priority: Minor  (was: Major)

> Flink uses insecure http confluent endpoint in its build
> 
>
> Key: FLINK-34541
> URL: https://issues.apache.org/jira/browse/FLINK-34541
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: PJ Fanning
>Priority: Minor
>
> See 
> https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55
> Please use https instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build

2024-02-28 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34541:
---
Issue Type: Technical Debt  (was: Bug)

> Flink uses insecure http confluent endpoint in its build
> 
>
> Key: FLINK-34541
> URL: https://issues.apache.org/jira/browse/FLINK-34541
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System
>Reporter: PJ Fanning
>Priority: Major
>
> See 
> https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55
> Please use https instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build

2024-02-28 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34541:
---
Component/s: Tests
 (was: Build System)

> Flink uses insecure http confluent endpoint in its build
> 
>
> Key: FLINK-34541
> URL: https://issues.apache.org/jira/browse/FLINK-34541
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Reporter: PJ Fanning
>Priority: Minor
>
> See 
> https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55
> Please use https instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34541) Flink uses insecure http confluent endpoint in its build

2024-02-28 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821886#comment-17821886
 ] 

Martijn Visser commented on FLINK-34541:


There's only one test that still uses this; I don't think it's worthy of a 
major priority. If anything, it's tech debt. Updating the ticket.

> Flink uses insecure http confluent endpoint in its build
> 
>
> Key: FLINK-34541
> URL: https://issues.apache.org/jira/browse/FLINK-34541
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: PJ Fanning
>Priority: Major
>
> See 
> https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55
> Please use https instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build

2024-02-28 Thread PJ Fanning (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PJ Fanning updated FLINK-34541:
---
Issue Type: Bug  (was: Improvement)

> Flink uses insecure http confluent endpoint in its build
> 
>
> Key: FLINK-34541
> URL: https://issues.apache.org/jira/browse/FLINK-34541
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: PJ Fanning
>Priority: Major
>
> See 
> https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55
> Please use https instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34469][table] Implement TableDistribution toString [flink]

2024-02-28 Thread via GitHub


JingGe commented on code in PR #24338:
URL: https://github.com/apache/flink/pull/24338#discussion_r1498379848


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableDistribution.java:
##
@@ -142,4 +142,9 @@ && getBucketCount().get() != 0) {
 sb.append("\n");
 return sb.toString();
 }
+
+@Override
+public String toString() {
+return asSerializableString();

Review Comment:
   1. If this is the requirement of the `toString()` method. We should consider 
marking the `asSerializableString()` as private and replace any external call 
of  `asSerializableString()` with `toString()`.  
   2. Could you also add a test for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34541) Flink uses insecure http confluent endpoint in its build

2024-02-28 Thread PJ Fanning (Jira)
PJ Fanning created FLINK-34541:
--

 Summary: Flink uses insecure http confluent endpoint in its build
 Key: FLINK-34541
 URL: https://issues.apache.org/jira/browse/FLINK-34541
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: PJ Fanning


See 
https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55

Please use https instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-28 Thread via GitHub


morazow commented on code in PR #182:
URL: https://github.com/apache/flink-docker/pull/182#discussion_r1506657180


##
.github/workflows/ci.yml:
##
@@ -17,18 +17,19 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz;
+
 jobs:
   ci:
-name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
-strategy:
-  fail-fast: false
-  matrix:

Review Comment:
   Yes agree, it also isolates the tests to their own environment which is good.
   
   There was some docker issue with matrix setup, I will update it again and 
check that tests run successfully. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]

2024-02-28 Thread via GitHub


morazow commented on PR #182:
URL: https://github.com/apache/flink-docker/pull/182#issuecomment-1969906057

   Thanks @XComp, I'll ping you again once I update it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]

2024-02-28 Thread via GitHub


snuyanzin commented on code in PR #23173:
URL: https://github.com/apache/flink/pull/23173#discussion_r1506638912


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */
+@Internal
+public class ArrayExceptFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator;
+private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+private transient MethodHandle hashcodeHandle;
+
+private transient MethodHandle equalityHandle;
+
+public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+hashcodeEvaluator =
+context.createEvaluator(
+Expressions.call("$HASHCODE$1", $("element1")),
+DataTypes.INT(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()));
+equalityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+hashcodeHandle = hashcodeEvaluator.open(context);
+equalityHandle = equalityEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set seen = new HashSet<>();
+
+boolean isNullPresentInArrayTwo = false;
+if (arrayTwo != null) {
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+if (element == null) {
+isNullPresentInArrayTwo = true;
+} else {
+ObjectContainer objectContainer = new 
ObjectContainer(element);
+seen.add(objectContainer);
+}
+}
+}
+boolean isNullPresentInArrayOne = false;
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+if (element == null) {
+isNullPresentInArrayOne = true;
+} else {
+ObjectContainer 

Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]

2024-02-28 Thread via GitHub


MartijnVisser commented on code in PR #23173:
URL: https://github.com/apache/flink/pull/23173#discussion_r1506585367


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */
+@Internal
+public class ArrayExceptFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator;
+private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+private transient MethodHandle hashcodeHandle;
+
+private transient MethodHandle equalityHandle;
+
+public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+hashcodeEvaluator =
+context.createEvaluator(
+Expressions.call("$HASHCODE$1", $("element1")),
+DataTypes.INT(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()));
+equalityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+hashcodeHandle = hashcodeEvaluator.open(context);
+equalityHandle = equalityEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set seen = new HashSet<>();
+
+boolean isNullPresentInArrayTwo = false;
+if (arrayTwo != null) {
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+if (element == null) {
+isNullPresentInArrayTwo = true;
+} else {
+ObjectContainer objectContainer = new 
ObjectContainer(element);
+seen.add(objectContainer);
+}
+}
+}
+boolean isNullPresentInArrayOne = false;
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+if (element == null) {
+isNullPresentInArrayOne = true;
+} else {
+ObjectContainer 

Re: [PR] [FLINK-34509] [docs] add missing "url" option for Debezium Avro [flink]

2024-02-28 Thread via GitHub


JingGe merged PR #24395:
URL: https://github.com/apache/flink/pull/24395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34152) Tune TaskManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of austoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of austoscaled jobs  (was: Tune 
TaskManager memory)

> Tune TaskManager memory of austoscaled jobs
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-34540:
--

Assignee: (was: Maximilian Michels)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune TaskManager 
memory of austoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34540:
---
Description: (was: Adjustments similar to FLINK-34152, but simpler 
because we only need to adjust heap memory and metaspace for the JobManager.)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Summary: Tune JobManager memory  (was: Tune JobManager memory of autoscaled 
jobs)

> Tune JobManager memory
> --
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34540:
--

 Summary: Tune number of task slots
 Key: FLINK-34540
 URL: https://issues.apache.org/jira/browse/FLINK-34540
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


Adjustments similar to FLINK-34152, but simpler because we only need to adjust 
heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: Umbrella issue to tackle tuning the Flink configuration as 
part of Flink Autoscaling.  (was: Umbrella issue to tackle)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> Umbrella issue to tackle tuning the Flink configuration as part of Flink 
> Autoscaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Labels:   (was: pull-request-available)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: Umbrella issue to tackle  (was: The current autoscaling 
algorithm adjusts the parallelism of the job task vertices according to the 
processing needs. By adjusting the parallelism, we systematically scale the 
amount of CPU for a task. At the same time, we also indirectly change the 
amount of memory tasks have at their dispense. However, there are some problems 
with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

[https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit])

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Fix Version/s: (was: kubernetes-operator-1.8.0)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Summary: Tune Flink config of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34016) Janino compile failed when watermark with column by udf

2024-02-28 Thread Sebastien Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821751#comment-17821751
 ] 

Sebastien Pereira commented on FLINK-34016:
---

Hi [~xuyangzhong] I have tested changes from your 
[PR|https://github.com/apache/flink/pull/24280] with a patch on 1.18.0 and it 
fixes the issue (I previously reported the issue on 1.18.0 in  
[FLINK-28693|https://issues.apache.org/jira/browse/FLINK-28693?focusedCommentId=17815957=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17815957]







 

 

> Janino compile failed when watermark with column by udf
> ---
>
> Key: FLINK-34016
> URL: https://issues.apache.org/jira/browse/FLINK-34016
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0, 1.18.0
>Reporter: ude
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-01-25-11-53-06-158.png, 
> image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, 
> image-2024-01-25-12-57-34-632.png
>
>
> After submit the following flink sql by sql-client.sh will throw an exception:
> {code:java}
> Caused by: java.lang.RuntimeException: Could not instantiate generated class 
> 'WatermarkGenerator$0'
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69)
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     ... 16 more
> Caused by: 
> org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 18 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
>     at 
> 

[jira] [Comment Edited] (FLINK-34016) Janino compile failed when watermark with column by udf

2024-02-28 Thread Sebastien Pereira (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821751#comment-17821751
 ] 

Sebastien Pereira edited comment on FLINK-34016 at 2/28/24 4:28 PM:


Hi [~xuyangzhong] I have tested changes from your 
[PR|https://github.com/apache/flink/pull/24280] with a patch on 1.18.0 and it 
fixes the issue (I previously reported the issue on 1.18.0 in  
[FLINK-28693|https://issues.apache.org/jira/browse/FLINK-28693?focusedCommentId=17815957=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17815957])







 

 


was (Author: JIRAUSER302567):
Hi [~xuyangzhong] I have tested changes from your 
[PR|https://github.com/apache/flink/pull/24280] with a patch on 1.18.0 and it 
fixes the issue (I previously reported the issue on 1.18.0 in  
[FLINK-28693|https://issues.apache.org/jira/browse/FLINK-28693?focusedCommentId=17815957=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17815957]







 

 

> Janino compile failed when watermark with column by udf
> ---
>
> Key: FLINK-34016
> URL: https://issues.apache.org/jira/browse/FLINK-34016
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0, 1.18.0
>Reporter: ude
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-01-25-11-53-06-158.png, 
> image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, 
> image-2024-01-25-12-57-34-632.png
>
>
> After submit the following flink sql by sql-client.sh will throw an exception:
> {code:java}
> Caused by: java.lang.RuntimeException: Could not instantiate generated class 
> 'WatermarkGenerator$0'
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69)
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     ... 16 more
> Caused by: 
> org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 18 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be 

[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Labels:   (was: pull-request-available)

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Description: Adjustments similar to FLINK-34152, but simpler because we 
only need to adjust heap memory and metaspace for the JobManager.  (was: 
Similarly to)

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

[https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]

2024-02-28 Thread via GitHub


hanyuzheng7 commented on code in PR #23173:
URL: https://github.com/apache/flink/pull/23173#discussion_r1506235081


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */
+@Internal
+public class ArrayExceptFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator;
+private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+private transient MethodHandle hashcodeHandle;
+
+private transient MethodHandle equalityHandle;
+
+public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+hashcodeEvaluator =
+context.createEvaluator(
+Expressions.call("$HASHCODE$1", $("element1")),
+DataTypes.INT(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()));
+equalityEvaluator =
+context.createEvaluator(
+$("element1").isEqual($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+hashcodeHandle = hashcodeEvaluator.open(context);
+equalityHandle = equalityEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set seen = new HashSet<>();
+
+boolean isNullPresentInArrayTwo = false;
+if (arrayTwo != null) {
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+if (element == null) {
+isNullPresentInArrayTwo = true;
+} else {
+ObjectContainer objectContainer = new 
ObjectContainer(element);
+seen.add(objectContainer);
+}
+}
+}
+boolean isNullPresentInArrayOne = false;
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+if (element == null) {
+isNullPresentInArrayOne = true;
+} else {
+ObjectContainer 

[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Description: Similarly to  (was: The current autoscaling algorithm adjusts 
the parallelism of the job task vertices according to the processing needs. By 
adjusting the parallelism, we systematically scale the amount of CPU for a 
task. At the same time, we also indirectly change the amount of memory tasks 
have at their dispense. However, there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 )

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Similarly to



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >