[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock
[ https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34544: --- Labels: pull-request-available (was: ) > Tiered result partition should be released with lock > > > Key: FLINK-34544 > URL: https://issues.apache.org/jira/browse/FLINK-34544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > > When releasing `TieredResultPartition`, it is not protected by a lock, then > it may throw iIlegal state exception occasionally. We should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34544][network] Add lock to release tiered result partition [flink]
TanYuxin-tyx opened a new pull request, #24412: URL: https://github.com/apache/flink/pull/24412 ## What is the purpose of the change *Add lock to release tiered result partition* ## Brief change log *(for example:)* - *Add lock to release tiered result partition* ## Verifying this change This change is already covered by existing tests, such as *TieredResultPartitionTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock
[ https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan updated FLINK-34544: -- Description: When releasing `TieredResultPartition`, it is not protected by a lock, then it may throw iIlegal state exception occasionally. We should fix it. (was: When releasing `TieredResultPartition`, it is not protected by a lock, then it may thrown iIlegal state exception occasionally. We should fix it.) > Tiered result partition should be released with lock > > > Key: FLINK-34544 > URL: https://issues.apache.org/jira/browse/FLINK-34544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > > When releasing `TieredResultPartition`, it is not protected by a lock, then > it may throw iIlegal state exception occasionally. We should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock
[ https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan updated FLINK-34544: -- Description: When releasing `TieredResultPartition`, it is not protected by a lock, then it may thrown iIlegal state exception occasionally. We should fix it. (was: The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when releasing resources. However, this check is performed in the task thread, while the buffer recycle may occur in the Netty thread. As a result, it may incorrectly throw an exception when the release is too quick for the vertex, which has almost no data. We should fix it.) > Tiered result partition should be released with lock > > > Key: FLINK-34544 > URL: https://issues.apache.org/jira/browse/FLINK-34544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > > When releasing `TieredResultPartition`, it is not protected by a lock, then > it may thrown iIlegal state exception occasionally. We should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34544) Tiered result partition should be released with lock
[ https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan updated FLINK-34544: -- Summary: Tiered result partition should be released with lock (was: The release check bug in tiered memory manager of hybrid shuffle) > Tiered result partition should be released with lock > > > Key: FLINK-34544 > URL: https://issues.apache.org/jira/browse/FLINK-34544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > > The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when > releasing resources. However, this check is performed in the task thread, > while the buffer recycle may occur in the Netty thread. As a result, it may > incorrectly throw an exception when the release is too quick for the vertex, > which has almost no data. > We should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]
hanyuzheng7 commented on code in PR #22842: URL: https://github.com/apache/flink/pull/22842#discussion_r1507136138 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java: ## @@ -368,6 +369,14 @@ public static InputTypeStrategy commonMultipleArrayType(int minCount) { /** @see ItemAtIndexArgumentTypeStrategy */ public static final ArgumentTypeStrategy ITEM_AT_INDEX = new ItemAtIndexArgumentTypeStrategy(); +/** + * An {@link InputTypeStrategy} that expects {@code count} arguments that have a common map + * type. + */ +public static InputTypeStrategy commonMapType(int count) { Review Comment: change it to minCount. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34469][table] Implement TableDistribution toString [flink]
jeyhunkarimov commented on code in PR #24338: URL: https://github.com/apache/flink/pull/24338#discussion_r1507135704 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableDistribution.java: ## @@ -142,4 +142,9 @@ && getBucketCount().get() != 0) { sb.append("\n"); return sb.toString(); } + +@Override +public String toString() { +return asSerializableString(); Review Comment: Done. But IMO `asSerializableString()` should be public as it is everywhere in the codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]
hanyuzheng7 commented on code in PR #22842: URL: https://github.com/apache/flink/pull/22842#discussion_r1507132219 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */ +@Internal +public class MapUnionFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter keyElementGetter; +private final ArrayData.ElementGetter valueElementGetter; + +private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator; +private transient MethodHandle keyEqualityHandle; + +public MapUnionFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.MAP_UNION, context); +KeyValueDataType outputType = +((KeyValueDataType) context.getCallContext().getOutputDataType().get()); +final DataType keyDataType = outputType.getKeyDataType(); +final DataType valueDataType = outputType.getValueDataType(); +keyElementGetter = + ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType()); +valueElementGetter = + ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType()); +keyEqualityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", keyDataType.notNull().toInternal()), +DataTypes.FIELD("element2", keyDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +keyEqualityHandle = keyEqualityEvaluator.open(context); +} + +public @Nullable MapData eval(@Nullable MapData... maps) { +try { +if (maps == null || maps.length == 0) { +return null; +} +if (maps.length == 1) { +return maps[0]; +} +MapData result = maps[0]; +for (int i = 1; i < maps.length; ++i) { +MapData map = maps[i]; +if (map != null && map.size() > 0) { +result = new MapDataForMapUnion(result, map); +} +} +return result; +} catch (Throwable t) { +throw new FlinkRuntimeException(t); +} +} + +private class MapDataForMapUnion implements MapData { +private final GenericArrayData keysArray; +private final GenericArrayData valuesArray; + +public MapDataForMapUnion(MapData map1, MapData map2) throws Throwable { +List keysList = new ArrayList<>(); +List valuesList = new ArrayList<>(); +boolean isKeyNullExist = false; +for (int i = 0; i < map2.size(); i++) { +Object key = keyElementGetter.getElementOrNull(map2.keyArray(), i); +if (key == null) { +isKeyNullExist = true; +} +keysList.add(key); + valuesList.add(valueElementGetter.getElementOrNull(map2.valueArray(), i)); +} + +
Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]
hanyuzheng7 commented on code in PR #22842: URL: https://github.com/apache/flink/pull/22842#discussion_r1507129381 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */ +@Internal +public class MapUnionFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter keyElementGetter; +private final ArrayData.ElementGetter valueElementGetter; + +private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator; +private transient MethodHandle keyEqualityHandle; + +public MapUnionFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.MAP_UNION, context); +KeyValueDataType outputType = +((KeyValueDataType) context.getCallContext().getOutputDataType().get()); +final DataType keyDataType = outputType.getKeyDataType(); +final DataType valueDataType = outputType.getValueDataType(); +keyElementGetter = + ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType()); +valueElementGetter = + ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType()); +keyEqualityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", keyDataType.notNull().toInternal()), +DataTypes.FIELD("element2", keyDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +keyEqualityHandle = keyEqualityEvaluator.open(context); +} + +public @Nullable MapData eval(@Nullable MapData... maps) { +try { +if (maps == null || maps.length == 0) { +return null; +} +if (maps.length == 1) { +return maps[0]; +} +MapData result = maps[0]; +for (int i = 1; i < maps.length; ++i) { +MapData map = maps[i]; +if (map != null && map.size() > 0) { +result = new MapDataForMapUnion(result, map); +} +} +return result; +} catch (Throwable t) { +throw new FlinkRuntimeException(t); +} +} + +private class MapDataForMapUnion implements MapData { +private final GenericArrayData keysArray; +private final GenericArrayData valuesArray; + +public MapDataForMapUnion(MapData map1, MapData map2) throws Throwable { +List keysList = new ArrayList<>(); +List valuesList = new ArrayList<>(); +boolean isKeyNullExist = false; +for (int i = 0; i < map2.size(); i++) { +Object key = keyElementGetter.getElementOrNull(map2.keyArray(), i); +if (key == null) { +isKeyNullExist = true; +} +keysList.add(key); + valuesList.add(valueElementGetter.getElementOrNull(map2.valueArray(), i)); +} + +
Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]
hanyuzheng7 commented on code in PR #22842: URL: https://github.com/apache/flink/pull/22842#discussion_r1507128713 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */ +@Internal +public class MapUnionFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter keyElementGetter; +private final ArrayData.ElementGetter valueElementGetter; + +private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator; +private transient MethodHandle keyEqualityHandle; + +public MapUnionFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.MAP_UNION, context); +KeyValueDataType outputType = +((KeyValueDataType) context.getCallContext().getOutputDataType().get()); +final DataType keyDataType = outputType.getKeyDataType(); +final DataType valueDataType = outputType.getValueDataType(); +keyElementGetter = + ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType()); +valueElementGetter = + ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType()); +keyEqualityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", keyDataType.notNull().toInternal()), +DataTypes.FIELD("element2", keyDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +keyEqualityHandle = keyEqualityEvaluator.open(context); +} + +public @Nullable MapData eval(@Nullable MapData... maps) { +try { +if (maps == null || maps.length == 0) { +return null; +} +if (maps.length == 1) { +return maps[0]; +} +MapData result = maps[0]; +for (int i = 1; i < maps.length; ++i) { +MapData map = maps[i]; +if (map != null && map.size() > 0) { +result = new MapDataForMapUnion(result, map); +} +} +return result; +} catch (Throwable t) { +throw new FlinkRuntimeException(t); +} +} + +private class MapDataForMapUnion implements MapData { +private final GenericArrayData keysArray; +private final GenericArrayData valuesArray; + +public MapDataForMapUnion(MapData map1, MapData map2) throws Throwable { +List keysList = new ArrayList<>(); +List valuesList = new ArrayList<>(); +boolean isKeyNullExist = false; +for (int i = 0; i < map2.size(); i++) { +Object key = keyElementGetter.getElementOrNull(map2.keyArray(), i); Review Comment: Ok, I will extracted it. ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software
[jira] [Updated] (FLINK-34544) The release check bug in tiered memory manager of hybrid shuffle
[ https://issues.apache.org/jira/browse/FLINK-34544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuxin Tan updated FLINK-34544: -- Affects Version/s: 1.20.0 (was: 1.19.0) > The release check bug in tiered memory manager of hybrid shuffle > > > Key: FLINK-34544 > URL: https://issues.apache.org/jira/browse/FLINK-34544 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.20.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > > The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when > releasing resources. However, this check is performed in the task thread, > while the buffer recycle may occur in the Netty thread. As a result, it may > incorrectly throw an exception when the release is too quick for the vertex, > which has almost no data. > We should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]
flinkbot commented on PR #24411: URL: https://github.com/apache/flink/pull/24411#issuecomment-1970559712 ## CI report: * 267b77b225bf82d8aa69a93a99627e337638ea66 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34535) Support JobPlanInfo for the explain result
[ https://issues.apache.org/jira/browse/FLINK-34535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34535: --- Labels: pull-request-available (was: ) > Support JobPlanInfo for the explain result > -- > > Key: FLINK-34535 > URL: https://issues.apache.org/jira/browse/FLINK-34535 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: yuanfenghu >Priority: Major > Labels: pull-request-available > > In the Flink Sql Explain syntax, we can set ExplainDetails to plan > JSON_EXECUTION_PLAN, but we cannot plan JobPlanInfo. If we can explain this > part of the information, referring to JobPlanInfo, I can combine it with the > parameter `pipeline.jobvertex-parallelism-overrides` to set up my task > parallelism -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34535] Support JobPlanInfo for the explain result [flink]
huyuanfeng2018 opened a new pull request, #24411: URL: https://github.com/apache/flink/pull/24411 ## What is the purpose of the change This PR is used to display the job json plan information of the task when flinksql explain is supported ## Brief change log *(for example:)* - Add an enum JSON_JOB_PLAN to ExplainDetail - Update all implementations of Planner#explain including streaming and batch - Append the job plan in json format to the result when Planner#explain execute with parameter ExplainDetail.JSON_JOB_PLAN. ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821972#comment-17821972 ] Matthias Pohl commented on FLINK-29114: --- * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57940=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=11858 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57956=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=11598 * https://github.com/apache/flink/actions/runs/8079627963/job/22074788185#step:10:11525 * https://github.com/apache/flink/actions/runs/8081916042/job/22082067075#step:10:11603 * https://github.com/apache/flink/actions/runs/8089966279/job/22107054311#step:10:11704 > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at >
[jira] [Updated] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-26515: -- Priority: Critical (was: Minor) > RetryingExecutorTest. testDiscardOnTimeout failed on azure > -- > > Key: FLINK-26515 > URL: https://issues.apache.org/jira/browse/FLINK-26515 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0 >Reporter: Yun Gao >Priority: Critical > Labels: auto-deprioritized-major, pull-request-available, > test-stability > > {code:java} > Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 1.941 s <<< FAILURE! - in > org.apache.flink.changelog.fs.RetryingExecutorTest > Mar 06 01:20:29 [ERROR] testTimeout Time elapsed: 1.934 s <<< FAILURE! > Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but > was:<1922.869766> > Mar 06 01:20:29 at org.junit.Assert.fail(Assert.java:89) > Mar 06 01:20:29 at org.junit.Assert.failNotEquals(Assert.java:835) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:555) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:685) > Mar 06 01:20:29 at > org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145) > Mar 06 01:20:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 01:20:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 01:20:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 01:20:29 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Mar 06 01:20:29 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Mar 06 01:20:29 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Mar 06 01:20:29 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43) > Mar 06 01:20:29 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 06 01:20:29 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 06 01:20:29 at > java.util.Iterator.forEachRemaining(Iterator.java:116) > Mar 06 01:20:29 at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821971#comment-17821971 ] Matthias Pohl commented on FLINK-26515: --- https://github.com/apache/flink/actions/runs/8089966419/job/22106994368#step:10:10565 > RetryingExecutorTest. testDiscardOnTimeout failed on azure > -- > > Key: FLINK-26515 > URL: https://issues.apache.org/jira/browse/FLINK-26515 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0 >Reporter: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available, > test-stability > > {code:java} > Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 1.941 s <<< FAILURE! - in > org.apache.flink.changelog.fs.RetryingExecutorTest > Mar 06 01:20:29 [ERROR] testTimeout Time elapsed: 1.934 s <<< FAILURE! > Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but > was:<1922.869766> > Mar 06 01:20:29 at org.junit.Assert.fail(Assert.java:89) > Mar 06 01:20:29 at org.junit.Assert.failNotEquals(Assert.java:835) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:555) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:685) > Mar 06 01:20:29 at > org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145) > Mar 06 01:20:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 01:20:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 01:20:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 01:20:29 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Mar 06 01:20:29 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Mar 06 01:20:29 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Mar 06 01:20:29 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43) > Mar 06 01:20:29 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 06 01:20:29 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 06 01:20:29 at > java.util.Iterator.forEachRemaining(Iterator.java:116) > Mar 06 01:20:29 at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
[ https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821970#comment-17821970 ] Matthias Pohl commented on FLINK-31472: --- * https://github.com/apache/flink/actions/runs/8079627963/job/22074788571#step:10:10480 * https://github.com/apache/flink/actions/runs/8089966279/job/22106940120#step:10:10483 > AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread > > > Key: FLINK-31472 > URL: https://issues.apache.org/jira/browse/FLINK-31472 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.16.1, 1.18.0, 1.19.0, 1.20.0 >Reporter: Ran Tao >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > when run mvn clean test, this case failed occasionally. > {noformat} > [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 > s <<< FAILURE! - in > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest > [ERROR] > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize > Time elapsed: 0.492 s <<< ERROR! > java.lang.IllegalStateException: Illegal thread detected. This method must be > called from inside the mailbox thread! > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262) > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > at >
[jira] [Commented] (FLINK-26644) python StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821968#comment-17821968 ] Matthias Pohl commented on FLINK-26644: --- https://github.com/apache/flink/actions/runs/8081916042/job/22082066739#step:10:24325 > python > StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies > failed on azure > --- > > Key: FLINK-26644 > URL: https://issues.apache.org/jira/browse/FLINK-26644 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.14.4, 1.15.0, 1.16.0, 1.19.0 >Reporter: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > 2022-03-14T18:50:24.6842853Z Mar 14 18:50:24 > === FAILURES > === > 2022-03-14T18:50:24.6844089Z Mar 14 18:50:24 _ > StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies _ > 2022-03-14T18:50:24.6844846Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6846063Z Mar 14 18:50:24 self = > testMethod=test_generate_stream_graph_with_dependencies> > 2022-03-14T18:50:24.6847104Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6847766Z Mar 14 18:50:24 def > test_generate_stream_graph_with_dependencies(self): > 2022-03-14T18:50:24.6848677Z Mar 14 18:50:24 python_file_dir = > os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) > 2022-03-14T18:50:24.6849833Z Mar 14 18:50:24 os.mkdir(python_file_dir) > 2022-03-14T18:50:24.6850729Z Mar 14 18:50:24 python_file_path = > os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py") > 2022-03-14T18:50:24.6852679Z Mar 14 18:50:24 with > open(python_file_path, 'w') as f: > 2022-03-14T18:50:24.6853646Z Mar 14 18:50:24 f.write("def > add_two(a):\nreturn a + 2") > 2022-03-14T18:50:24.6854394Z Mar 14 18:50:24 env = self.env > 2022-03-14T18:50:24.6855019Z Mar 14 18:50:24 > env.add_python_file(python_file_path) > 2022-03-14T18:50:24.6855519Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6856254Z Mar 14 18:50:24 def plus_two_map(value): > 2022-03-14T18:50:24.6857045Z Mar 14 18:50:24 from > test_stream_dependency_manage_lib import add_two > 2022-03-14T18:50:24.6857865Z Mar 14 18:50:24 return value[0], > add_two(value[1]) > 2022-03-14T18:50:24.6858466Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6858924Z Mar 14 18:50:24 def add_from_file(i): > 2022-03-14T18:50:24.6859806Z Mar 14 18:50:24 with > open("data/data.txt", 'r') as f: > 2022-03-14T18:50:24.6860266Z Mar 14 18:50:24 return i[0], > i[1] + int(f.read()) > 2022-03-14T18:50:24.6860879Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6862022Z Mar 14 18:50:24 from_collection_source = > env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1), > 2022-03-14T18:50:24.6863259Z Mar 14 18:50:24 > ('e', 2)], > 2022-03-14T18:50:24.6864057Z Mar 14 18:50:24 > type_info=Types.ROW([Types.STRING(), > 2022-03-14T18:50:24.6864651Z Mar 14 18:50:24 > Types.INT()])) > 2022-03-14T18:50:24.6865150Z Mar 14 18:50:24 > from_collection_source.name("From Collection") > 2022-03-14T18:50:24.6866212Z Mar 14 18:50:24 keyed_stream = > from_collection_source.key_by(lambda x: x[1], key_type=Types.INT()) > 2022-03-14T18:50:24.6867083Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6867793Z Mar 14 18:50:24 plus_two_map_stream = > keyed_stream.map(plus_two_map).name("Plus Two Map").set_parallelism(3) > 2022-03-14T18:50:24.6868620Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6869412Z Mar 14 18:50:24 add_from_file_map = > plus_two_map_stream.map(add_from_file).name("Add From File Map") > 2022-03-14T18:50:24.6870239Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6870883Z Mar 14 18:50:24 test_stream_sink = > add_from_file_map.add_sink(self.test_sink).name("Test Sink") > 2022-03-14T18:50:24.6871803Z Mar 14 18:50:24 > test_stream_sink.set_parallelism(4) > 2022-03-14T18:50:24.6872291Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6872756Z Mar 14 18:50:24 archive_dir_path = > os.path.join(self.tempdir, "archive_" + str(uuid.uuid4())) > 2022-03-14T18:50:24.6873557Z Mar 14 18:50:24 > os.mkdir(archive_dir_path) > 2022-03-14T18:50:24.6874817Z Mar 14 18:50:24 with > open(os.path.join(archive_dir_path, "data.txt"), 'w') as f: > 2022-03-14T18:50:24.6875414Z Mar 14 18:50:24 f.write("3") > 2022-03-14T18:50:24.6875906Z Mar 14 18:50:24 archive_file_path = \
[jira] [Updated] (FLINK-34449) Flink build took too long
[ https://issues.apache.org/jira/browse/FLINK-34449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34449: -- Affects Version/s: 1.18.1 > Flink build took too long > - > > Key: FLINK-34449 > URL: https://issues.apache.org/jira/browse/FLINK-34449 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.17.2, 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We saw a timeout when building Flink in e2e1 stage. No logs are available to > investigate the issue: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57551=logs=bbb1e2a2-a43c-55c8-fb48-5cfe7a8a0ca6 > {code} > Nothing to show. Final logs are missing. This can happen when the job is > cancelled or times out. > {code} > I'd consider this an infrastructure issue but created the Jira issue for > documentation purposes. Let's see whether that pops up again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34449) Flink build took too long
[ https://issues.apache.org/jira/browse/FLINK-34449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821967#comment-17821967 ] Matthias Pohl commented on FLINK-34449: --- This time, it happened in the caching step: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57957=logs=87489130-75dc-54e4-1f45-80c30aa367a3=5f5e3bcf-c82b-57ca-7f80-f293d0ad4448=1 > Flink build took too long > - > > Key: FLINK-34449 > URL: https://issues.apache.org/jira/browse/FLINK-34449 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.17.2 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We saw a timeout when building Flink in e2e1 stage. No logs are available to > investigate the issue: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57551=logs=bbb1e2a2-a43c-55c8-fb48-5cfe7a8a0ca6 > {code} > Nothing to show. Final logs are missing. This can happen when the job is > cancelled or times out. > {code} > I'd consider this an infrastructure issue but created the Jira issue for > documentation purposes. Let's see whether that pops up again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34499) Configuration#toString should hide sensitive values
[ https://issues.apache.org/jira/browse/FLINK-34499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34499. --- Fix Version/s: 1.19.0 1.17.3 1.18.2 (was: 1.20.0) Resolution: Fixed master: [9e81177e44d63501b360b1a246a16ea3faecb548|https://github.com/apache/flink/commit/9e81177e44d63501b360b1a246a16ea3faecb548] 1.19: [5016325b6ec930a3f5cd3b186c185e004ede8691|https://github.com/apache/flink/commit/5016325b6ec930a3f5cd3b186c185e004ede8691] 1.18: [e770cef5d1df282fe28deb5f1309873b8342d046|https://github.com/apache/flink/commit/e770cef5d1df282fe28deb5f1309873b8342d046] 1.17: [72fbc89777286d9cda46a80231b2db11c21ced0d|https://github.com/apache/flink/commit/72fbc89777286d9cda46a80231b2db11c21ced0d] > Configuration#toString should hide sensitive values > --- > > Key: FLINK-34499 > URL: https://issues.apache.org/jira/browse/FLINK-34499 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0, 1.17.3, 1.18.2 > > > Time and time again people log the entire Flink configuration for no reason, > risking that sensitive values are logged in plain text. > We should make this harder by changing {{Configuration#toString}} to > automatically hide sensitive values, for example like this: > {code} > @Override > public String toString() { > return ConfigurationUtils > > .hideSensitiveValues(this.confData.entrySet().stream().collect( > Collectors.toMap(Map.Entry::getKey, entry -> > entry.getValue().toString( > .toString(); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.17][FLINK-34499] Configuration#toString hides sensitive values [flink]
XComp merged PR #24406: URL: https://github.com/apache/flink/pull/24406 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-34499] Configuration#toString hides sensitive values [flink]
XComp merged PR #24405: URL: https://github.com/apache/flink/pull/24405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][FLINK-34499] Configuration#toString hides sensitive values [flink]
XComp merged PR #24404: URL: https://github.com/apache/flink/pull/24404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34499] Configuration#toString hides sensitive values [flink]
XComp merged PR #24370: URL: https://github.com/apache/flink/pull/24370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34498) GSFileSystemFactory logs full Flink config
[ https://issues.apache.org/jira/browse/FLINK-34498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34498: -- Fix Version/s: (was: 1.20.0) > GSFileSystemFactory logs full Flink config > -- > > Key: FLINK-34498 > URL: https://issues.apache.org/jira/browse/FLINK-34498 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.18.1 >Reporter: Chesnay Schepler >Assignee: Jeyhun Karimov >Priority: Blocker > Labels: pull-request-available > Fix For: 1.19.0 > > > This can cause secrets from the config to be logged. > {code} > @Override > public void configure(Configuration flinkConfig) { > LOGGER.info("Configuring GSFileSystemFactory with Flink configuration > {}", flinkConfig); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34498) GSFileSystemFactory logs full Flink config
[ https://issues.apache.org/jira/browse/FLINK-34498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34498. --- Fix Version/s: (was: 1.18.2) Assignee: Jeyhun Karimov Resolution: Fixed master: [8bb4457b6cf13d85639e914a8d959e39c2257d66|https://github.com/apache/flink/commit/8bb4457b6cf13d85639e914a8d959e39c2257d66] 1.19: [628ae78ab304a7543a8d5566a7dd6cbcb336d61b|https://github.com/apache/flink/commit/628ae78ab304a7543a8d5566a7dd6cbcb336d61b] > GSFileSystemFactory logs full Flink config > -- > > Key: FLINK-34498 > URL: https://issues.apache.org/jira/browse/FLINK-34498 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.18.1 >Reporter: Chesnay Schepler >Assignee: Jeyhun Karimov >Priority: Blocker > Labels: pull-request-available > Fix For: 1.19.0, 1.20.0 > > > This can cause secrets from the config to be logged. > {code} > @Override > public void configure(Configuration flinkConfig) { > LOGGER.info("Configuring GSFileSystemFactory with Flink configuration > {}", flinkConfig); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.19][FLINK-34498] GSFileSystemFactory should not log full Flink config [flink]
XComp merged PR #24408: URL: https://github.com/apache/flink/pull/24408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34498][filesystem] GSFileSystemFactory should not log full Flink log [flink]
XComp merged PR #24372: URL: https://github.com/apache/flink/pull/24372 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34544) The release check bug in tiered memory manager of hybrid shuffle
Yuxin Tan created FLINK-34544: - Summary: The release check bug in tiered memory manager of hybrid shuffle Key: FLINK-34544 URL: https://issues.apache.org/jira/browse/FLINK-34544 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.19.0 Reporter: Yuxin Tan Assignee: Yuxin Tan The `TieredStorageMemoryManagerImpl` has a `numRequestedBuffers` check when releasing resources. However, this check is performed in the task thread, while the buffer recycle may occur in the Netty thread. As a result, it may incorrectly throw an exception when the release is too quick for the vertex, which has almost no data. We should fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821949#comment-17821949 ] Benchao Li commented on FLINK-34529: [~nilerzhou] Thanks for reporting this, it sounds good to me. In the description, the expected plan seems already pushing the {{Calc}} through {{Rank}} (but not through {{Join}}), why is that? Besides, I noted that there is a {{CalcRankTransposeRule}} in Flink already, why does it not work as expected? > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821947#comment-17821947 ] Feng Jin commented on FLINK-29114: -- [~qingyue] Thank you for your reply. Indeed, as you said, this is not an issue with unit testing but a problem with the generation of the staging directory. > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at >
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821941#comment-17821941 ] Jane Chan commented on FLINK-29114: --- [~hackergin] Different sink paths could avoid the unstable case. However, the problem lies in the way of generating the staging dir path. It's unreliable to rely solely on the timestamp as a path postfix. > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at >
Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]
Zakelly commented on code in PR #24402: URL: https://github.com/apache/flink/pull/24402#discussion_r1507058342 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -169,6 +173,26 @@ public class CheckpointingOptions { + "deactivated. Local recovery currently only covers keyed state backends " + "(including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."); +/** + * This option configures local backup for the state backend, which indicates whether to make + * backup checkpoint on local disk. If not configured, fallback to {@link + * StateRecoveryOptions#LOCAL_RECOVERY}. By default, local backup is deactivated. Local backup + * currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend + * and the HashMapStateBackend). + */ +public static final ConfigOption LOCAL_BACKUP_ENABLED = +ConfigOptions.key("execution.checkpointing.local-backup.enabled") +.booleanType() + .defaultValue(StateRecoveryOptions.LOCAL_RECOVERY.defaultValue()) + .withFallbackKeys(StateRecoveryOptions.LOCAL_RECOVERY.key()) Review Comment: I'm thinking does it make sense to make `'LOCAL_BACKUP_ENABLED'` as fallback keys of the new `'LOCAL_RECOVERY'` instead of the reverse order? Since whether to recover from local is typically set aligned with whether to make local backup. And both options should have same deprecated keys 'state.backend.local-recovery'. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalBackupAndRecoveryDirectoryProvider.java: ## @@ -48,7 +48,7 @@ * * */ -public interface LocalRecoveryDirectoryProvider extends Serializable { +public interface LocalBackupAndRecoveryDirectoryProvider extends Serializable { Review Comment: I'd suggest name it 'LocalBackupDirectoryProvider' or keep the original name. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34483][docs] Improve the documentation of 'state.checkpoints.dir' and 'state.checkpoint-storage' [flink]
Zakelly commented on code in PR #24401: URL: https://github.com/apache/flink/pull/24401#discussion_r1507047875 ## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ## @@ -76,6 +76,39 @@ public class CheckpointingOptions { * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. * * Recognized shortcut names are 'jobmanager' and 'filesystem'. + * + * {@link #CHECKPOINT_STORAGE} and {@link #CHECKPOINTS_DIRECTORY} are usually combined to + * configure the checkpoint location. The behaviors of different combinations are as follows: Review Comment: Instead of combining these two options, I'd suggest introduce the behaviors when set CHECKPOINT_STORAGE='jobmanager' and CHECKPOINT_STORAGE='filesystem' seperately. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]
LadyForest commented on PR #24390: URL: https://github.com/apache/flink/pull/24390#issuecomment-1970441742 Hi @snuyanzin @XComp, thanks for your comments on the review. I updated the test (actually not perfect either..) and tried best to verify the case. Would you mind retaking a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]
hunter-cloud09 closed pull request #740: [FLINK-33930] Canceled stop job status exception URL: https://github.com/apache/flink-kubernetes-operator/pull/740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33930] Canceled stop job status exception [flink-kubernetes-operator]
hunter-cloud09 commented on PR #740: URL: https://github.com/apache/flink-kubernetes-operator/pull/740#issuecomment-1970439822 > Should we close this? @hunter-cloud09 yep! If you do not want do this. i solve this in my local. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34372][table-api] Support DESCRIBE CATALOG [flink]
hackergin commented on code in PR #24275: URL: https://github.com/apache/flink/pull/24275#discussion_r1506965964 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java: ## @@ -47,4 +51,31 @@ public String getName() { public String getDefaultDatabase() { return defaultDatabase; } + +@Override +public String explainCatalog() { +StringBuilder sb = new StringBuilder(); +Map extraExplainInfo = extraExplainInfo(); +sb.append("default database: ").append(defaultDatabase); +if (!extraExplainInfo.isEmpty()) { +sb.append("\n"); +sb.append( +extraExplainInfo.entrySet().stream() +.map(entry -> entry.getKey() + ": " + entry.getValue()) +.collect(Collectors.joining("\n"))); +} + +// put the class name at the end +sb.append("\n").append(Catalog.super.explainCatalog()); +return sb.toString(); +} + +/** + * Extra explain information used to print by DESCRIBE CATALOG catalogName statement. + * + * Note: The class name and default database name of this catalog are no need to be added. + */ +protected Map extraExplainInfo() { Review Comment: Can we directly retrieve parameter information from the catalog store instead of introducing a new interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]
hanyuzheng7 commented on code in PR #22842: URL: https://github.com/apache/flink/pull/22842#discussion_r1506962857 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */ +@Internal +public class MapUnionFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter keyElementGetter; +private final ArrayData.ElementGetter valueElementGetter; + +private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator; +private transient MethodHandle keyEqualityHandle; + +public MapUnionFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.MAP_UNION, context); +KeyValueDataType outputType = +((KeyValueDataType) context.getCallContext().getOutputDataType().get()); +final DataType keyDataType = outputType.getKeyDataType(); +final DataType valueDataType = outputType.getValueDataType(); +keyElementGetter = + ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType()); +valueElementGetter = + ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType()); +keyEqualityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", keyDataType.notNull().toInternal()), +DataTypes.FIELD("element2", keyDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +keyEqualityHandle = keyEqualityEvaluator.open(context); +} + +public @Nullable MapData eval(@Nullable MapData... maps) { +try { +if (maps == null || maps.length == 0) { +return null; +} +if (maps.length == 1) { +return maps[0]; +} +MapData result = maps[0]; +for (int i = 1; i < maps.length; ++i) { +MapData map = maps[i]; +if (map != null && map.size() > 0) { Review Comment: Yes, if map == null, we should return null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32261][table] Add built-in MAP_UNION function. [flink]
hanyuzheng7 commented on code in PR #22842: URL: https://github.com/apache/flink/pull/22842#discussion_r1506962507 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapUnionFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#MAP_UNION}. */ +@Internal +public class MapUnionFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter keyElementGetter; +private final ArrayData.ElementGetter valueElementGetter; + +private final SpecializedFunction.ExpressionEvaluator keyEqualityEvaluator; +private transient MethodHandle keyEqualityHandle; + +public MapUnionFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.MAP_UNION, context); +KeyValueDataType outputType = +((KeyValueDataType) context.getCallContext().getOutputDataType().get()); +final DataType keyDataType = outputType.getKeyDataType(); +final DataType valueDataType = outputType.getValueDataType(); +keyElementGetter = + ArrayData.createElementGetter(outputType.getKeyDataType().getLogicalType()); +valueElementGetter = + ArrayData.createElementGetter(outputType.getValueDataType().getLogicalType()); +keyEqualityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", keyDataType.notNull().toInternal()), +DataTypes.FIELD("element2", keyDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +keyEqualityHandle = keyEqualityEvaluator.open(context); +} + +public @Nullable MapData eval(@Nullable MapData... maps) { +try { +if (maps == null || maps.length == 0) { +return null; +} +if (maps.length == 1) { +return maps[0]; +} +MapData result = maps[0]; Review Comment: Ok, I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34522] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
flinkbot commented on PR #24410: URL: https://github.com/apache/flink/pull/24410#issuecomment-1970317964 ## CI report: * f377be1a0e1e7006199f0bca31be1dd8cb330acd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34522] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
qinf opened a new pull request, #24410: URL: https://github.com/apache/flink/pull/24410 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34522) StateTtlConfig#cleanupInRocksdbCompactFilter still uses the deprecated Time class
[ https://issues.apache.org/jira/browse/FLINK-34522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821921#comment-17821921 ] lincoln lee commented on FLINK-34522: - [~mapohl] Thanks for resolving it! Back to this issue, [~dianfu] & [~hxb] have made some attempts, but there're no clear leads yet, looks like it will take some time. > StateTtlConfig#cleanupInRocksdbCompactFilter still uses the deprecated Time > class > - > > Key: FLINK-34522 > URL: https://issues.apache.org/jira/browse/FLINK-34522 > Project: Flink > Issue Type: Improvement > Components: API / Core >Affects Versions: 1.19.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.19.0, 1.20.0 > > > FLINK-32570 deprecated the Time class and refactor all Public or > PublicEvolving apis to use the Java's Duration. > StateTtlConfig.Builder#cleanupInRocksdbCompactFilter is still using the Time > class. In general, we expect: > * Mark {{cleanupInRocksdbCompactFilter(long, Time)}} as {{@Deprecated}} > * Provide a new cleanupInRocksdbCompactFilter(long, Duration) > Note: This is exactly what FLINK-32570 does, so I guess FLINK-32570 missed > cleanupInRocksdbCompactFilter. > But I found this method is introduced in 1.19(FLINK-30854), so a better > solution may be: only provide cleanupInRocksdbCompactFilter(long, Duration) > and don't use Time. > The deprecated Api should be keep for 2 minor version. IIUC, we cannot remove > Time related class in Flink 2.0 if we don't deprecate it in 1.19. If so, I > think it's better to merge this JIRA in 1.19.0 as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34184) Update copyright and license file
[ https://issues.apache.org/jira/browse/FLINK-34184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-34184: - Assignee: Hang Ruan > Update copyright and license file > - > > Key: FLINK-34184 > URL: https://issues.apache.org/jira/browse/FLINK-34184 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: Leonard Xu >Assignee: Hang Ruan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34183) Add NOTICE files for Flink CDC project
[ https://issues.apache.org/jira/browse/FLINK-34183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-34183: - Assignee: Hang Ruan > Add NOTICE files for Flink CDC project > -- > > Key: FLINK-34183 > URL: https://issues.apache.org/jira/browse/FLINK-34183 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: Leonard Xu >Assignee: Hang Ruan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34188) Setup release infrastructure for Flink CDC project
[ https://issues.apache.org/jira/browse/FLINK-34188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-34188: - Assignee: LvYanquan > Setup release infrastructure for Flink CDC project > -- > > Key: FLINK-34188 > URL: https://issues.apache.org/jira/browse/FLINK-34188 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: Leonard Xu >Assignee: LvYanquan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on code in PR #24397: URL: https://github.com/apache/flink/pull/24397#discussion_r1506925414 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java: ## @@ -138,6 +137,14 @@ private Object[] getConvertedArgumentValues( return argumentVal; } +private ProcedureContext getProcedureContext(TableConfig tableConfig) { +Configuration configuration = (Configuration) tableConfig.getRootConfiguration(); Review Comment: Yeh ,it's better, but the Configuration class doesn't provide a construct as Configuration(ReadableConfig config), it has to be : Configuration configuration = new Configuration((Configuration)tableConfig.getRootConfiguration()); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
JustinLeesin commented on code in PR #24397: URL: https://github.com/apache/flink/pull/24397#discussion_r1506923378 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java: ## @@ -210,6 +214,31 @@ void testNamedArgumentsWithOptionalArguments() { ResolvedSchema.of(Column.physical("result", DataTypes.STRING(; } +@Test +void testEnvironmentConf() throws DatabaseAlreadyExistException { +// root conf should work +Configuration configuration = new Configuration(); +configuration.setString("key1", "value1"); +StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); +TestProcedureCatalogFactory.CatalogWithBuiltInProcedure procedureCatalog = +new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog"); +procedureCatalog.createDatabase( +"system", new CatalogDatabaseImpl(Collections.emptyMap(), null), true); +tableEnv.registerCatalog("test_p", procedureCatalog); +tableEnv.useCatalog("test_p"); Review Comment: if also add a property with key1, it will also overwrite the same key again. Do you mean to add a property ("key2", "value22") to table config and then check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821920#comment-17821920 ] Feng Jin commented on FLINK-29114: -- [~qingyue] Can we fix this issue by creating different Sink Tables? If so, I am willing to help fix it. > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at >
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1506918553 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private transient MethodHandle hashcodeHandle; + +private transient MethodHandle equalityHandle; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +hashcodeEvaluator = +context.createEvaluator( +Expressions.call("$HASHCODE$1", $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null) { +return null; +} + +List list = new ArrayList<>(); +Set seen = new HashSet<>(); + +boolean isNullPresentInArrayTwo = false; +if (arrayTwo != null) { +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +isNullPresentInArrayTwo = true; +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +seen.add(objectContainer); +} +} +} +boolean isNullPresentInArrayOne = false; +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +if (element == null) { +isNullPresentInArrayOne = true; +} else { +ObjectContainer
[jira] [Created] (FLINK-34543) Support Full Partition Processing On Non-keyed DataStream
Wencong Liu created FLINK-34543: --- Summary: Support Full Partition Processing On Non-keyed DataStream Key: FLINK-34543 URL: https://issues.apache.org/jira/browse/FLINK-34543 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.20.0 Reporter: Wencong Liu Fix For: 1.20.0 1. Introduce MapParititon, SortPartition, Aggregate, Reduce API in DataStream. 2. Introduce SortPartition API in KeyedStream. The related FLIP can be found in [FLIP-380|https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
[ https://issues.apache.org/jira/browse/FLINK-34542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lennon Yu updated FLINK-34542: -- Description: This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain \{ languageVersion = JavaLanguageVersion.of(17) } } ** This is the recommended way by Gradle to streamline langauge version configuration. was: This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * {{{}Add {{mergeServiceFiles(){} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain { languageVersion = JavaLanguageVersion.of(17) } } ** This is the recommended way by Gradle to streamline langauge version configuration. > Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives > --- > > Key: FLINK-34542 > URL: https://issues.apache.org/jira/browse/FLINK-34542 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Lennon Yu >Priority: Minor > > This is a ticket of misc. improvements on the build.gradle script provided at > {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> > {{Getting Started:}} > * Add {{mergeServiceFiles()}} call to the {{shadowJar}} configuration block > ** Absence of this will cause class-not-found errors in SPI related class > loading if the user has multiple connectors/formats in their implementation. > * Move the top level {{mainClassName}} project property setting into > application \{ mainClass = 'foo.Bar' } > ** This is because the top-level mainClassName property will be deprecated > in Gradle 9.0+ > * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} > properties with java \{ toolChain \{ languageVersion = > JavaLanguageVersion.of(17) } } > ** This is the recommended way by Gradle to streamline langauge version > configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34542) Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives
Lennon Yu created FLINK-34542: - Summary: Improve Gradle Quick Start build.gradle with Better Gradle API Alternatives Key: FLINK-34542 URL: https://issues.apache.org/jira/browse/FLINK-34542 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Lennon Yu This is a ticket of misc. improvements on the build.gradle script provided at {{Application Development}} >> {{Project Configuration}} >> {{Overview}} >> {{Getting Started:}} * {{{}Add {{mergeServiceFiles(){} call to the {{shadowJar}} configuration block ** Absence of this will cause class-not-found errors in SPI related class loading if the user has multiple connectors/formats in their implementation. * Move the top level {{mainClassName}} project property setting into application \{ mainClass = 'foo.Bar' } ** This is because the top-level mainClassName property will be deprecated in Gradle 9.0+ * Replace the use of {{sourceCompatibility}} and {{targetCompatibility}} properties with java \{ toolChain { languageVersion = JavaLanguageVersion.of(17) } } ** This is the recommended way by Gradle to streamline langauge version configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][doc] Update the Kafka DDL example in the hive catalog page to the new parameter style [flink]
hackergin commented on PR #24409: URL: https://github.com/apache/flink/pull/24409#issuecomment-1970258704 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][doc] Update the Kafka DDL example in the hive catalog page to the new parameter style [flink]
hackergin commented on PR #24409: URL: https://github.com/apache/flink/pull/24409#issuecomment-1970258454 CI failed because of https://issues.apache.org/jira/browse/FLINK-29114 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821914#comment-17821914 ] Feng Jin commented on FLINK-29114: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57954=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4 > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at >
Re: [PR] [FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
luoyuxia commented on code in PR #24397: URL: https://github.com/apache/flink/pull/24397#discussion_r1506889418 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java: ## @@ -210,6 +214,31 @@ void testNamedArgumentsWithOptionalArguments() { ResolvedSchema.of(Column.physical("result", DataTypes.STRING(; } +@Test +void testEnvironmentConf() throws DatabaseAlreadyExistException { +// root conf should work +Configuration configuration = new Configuration(); +configuration.setString("key1", "value1"); +StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); +TestProcedureCatalogFactory.CatalogWithBuiltInProcedure procedureCatalog = +new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog"); +procedureCatalog.createDatabase( +"system", new CatalogDatabaseImpl(Collections.emptyMap(), null), true); +tableEnv.registerCatalog("test_p", procedureCatalog); +tableEnv.useCatalog("test_p"); Review Comment: Can you please also add a property ("key1, "value22") to table config , and then call the get_env_conf to make sure we can get both table configuration and root configuration ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java: ## @@ -138,6 +137,14 @@ private Object[] getConvertedArgumentValues( return argumentVal; } +private ProcedureContext getProcedureContext(TableConfig tableConfig) { +Configuration configuration = (Configuration) tableConfig.getRootConfiguration(); Review Comment: nit: Configuration configuration = new Configuration(tableConfig.getRootConfiguration()); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. Migration Plan is documented here: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released as part of Flink 1.18.1, the Kryo library in > Flink 1.18.1 needs to be
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. Migration Plan is documented here: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and a path for snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. Migration Plan is documented here: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. Migration Plan is documented here: [https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0] *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 and snapshot migration https://issues.apache.org/jira/browse/FLINK-3154. was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. Migration Plan is documented here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0 *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 https://issues.apache.org/jira/browse/FLINK-3154. > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released as part of Flink 1.18.1, the Kryo
[jira] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491 ] Dhruv Patel deleted comment on FLINK-34491: - was (Author: JIRAUSER289387): Following issue has been observed in after enabling SSL in flink. Since after migration flink uses tls1.3 as default |Change |Description | | |SSL / TLS v1.3|the handshake between the flink components now uses TLS v1.3 with Cipher: TLS_AES_256_GCM_SHA384 which is causing SSL handshake failures. {code:java} SSL3 alert read:fatal:handshake failure SSL_connect:error in error 409B7454F87F:error:0A000410:SSL routines:ssl3_read_bytes:sslv3 alert handshake failure:ssl/record/rec_layer_s3.c:1586:SSL alert number 40 – Server Temp Key: ECDH, prime256v1, 256 bits — SSL handshake has read 470 bytes and written 730 bytes Verification: OK — New, TLSv1.3, Cipher is TLS_AES_256_GCM_SHA384 This TLS version forbids renegotiation.| {code}| | > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released as part of Flink 1.18.1, the Kryo library in > Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 > does not support Java 17. This improvement plan is tracked as part of this > ticket https://issues.apache.org/jira/browse/FLINK-3154. > All Flink applications using protobuf currently generate state with Kryo v2. > Once the above improvement plan is complete all Flink applications will fully > support reading that state and write newer state with Kryo v5. However, > latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This > will prevent applications which are using snapshot mechanism to deploy their > jobs to latest Flink version with Kryo v5 support without a bridge version > running on Java 11. Applications will have to run on a bridge release version > that will read their state with Kryo v2 data and write it with Kryo v5 data > before upgrading to a future version of Flink that completely drops support > for Kryo v2. > Basically, Flink applications using protobuf dataformat cannot move directly > from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. > Applications will need to first move to Java 11 (bridging version) and then > move to Java 17 to have a safe deployment. > Migration Plan is documented here: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0 > *Blocker for this task:* > Upgrade to Kryo 5.5.0 which supports Java 17 > https://issues.apache.org/jira/browse/FLINK-3154. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime after the kryo v5 release in Flink. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. Migration Plan is documented here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0 *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 https://issues.apache.org/jira/browse/FLINK-3154. was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 https://issues.apache.org/jira/browse/FLINK-3154. > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released as part of Flink 1.18.1, the Kryo library in > Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 > does not support Java 17. This improvement plan is tracked as part of this > ticket
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. Basically, Flink applications using protobuf dataformat cannot move directly from Java 8 to Java 17 without downtime. Applications will need to first move to Java 11 (bridging version) and then move to Java 17 to have a safe deployment. *Blocker for this task:* Upgrade to Kryo 5.5.0 which supports Java 17 https://issues.apache.org/jira/browse/FLINK-3154. was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. *Blocker for this task:* Below are some of the blocker task which prevent certain Flink applications which use protobuf from upgrading to Java 17 *Blocker 1:* Upgrade to Kryo 5.5.0 which supports Java 17 https://issues.apache.org/jira/browse/FLINK-3154. *Blocker 2:* Validate whether a snapshot with Java 8 is compatible with Java 17 without using a bridge version running on Java 11. https://issues.apache.org/jira/browse/FLINK-33707 > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released as part of Flink 1.18.1, the Kryo library in > Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 > does not support Java 17. This improvement plan is tracked as part of this > ticket https://issues.apache.org/jira/browse/FLINK-3154. > All Flink applications using protobuf currently generate
Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
RocMarshal commented on PR #24407: URL: https://github.com/apache/flink/pull/24407#issuecomment-1970101645 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter [flink]
RocMarshal commented on PR #24407: URL: https://github.com/apache/flink/pull/24407#issuecomment-1970101117 There's an unstable test case which is nothing to do with the current pr. So, I'll trigger the next run CI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34491) Move from experimental support to production support for Java 17
[ https://issues.apache.org/jira/browse/FLINK-34491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruv Patel updated FLINK-34491: Description: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Background:* Flink supports protobuf dataformat to exchange messages between different operators and the serialization and deserialization of those protobufs are performed by library called "Kryo". In order to move away from experimental support of Java 17 released as part of Flink 1.18.1, the Kryo library in Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 does not support Java 17. This improvement plan is tracked as part of this ticket https://issues.apache.org/jira/browse/FLINK-3154. All Flink applications using protobuf currently generate state with Kryo v2. Once the above improvement plan is complete all Flink applications will fully support reading that state and write newer state with Kryo v5. However, latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This will prevent applications which are using snapshot mechanism to deploy their jobs to latest Flink version with Kryo v5 support without a bridge version running on Java 11. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2. *Blocker for this task:* Below are some of the blocker task which prevent certain Flink applications which use protobuf from upgrading to Java 17 *Blocker 1:* Upgrade to Kryo 5.5.0 which supports Java 17 https://issues.apache.org/jira/browse/FLINK-3154. *Blocker 2:* Validate whether a snapshot with Java 8 is compatible with Java 17 without using a bridge version running on Java 11. https://issues.apache.org/jira/browse/FLINK-33707 was: This task is to move away from experimental support for Java 17 to production support so that teams running Flink in production can migrate to Java 17 successfully *Blocker for this task:* Savepoint migration is not supported with Java 17 and Flink 1.18.1 as mentioned in this ticket https://issues.apache.org/jira/browse/FLINK-33707 > Move from experimental support to production support for Java 17 > > > Key: FLINK-34491 > URL: https://issues.apache.org/jira/browse/FLINK-34491 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.18.1 >Reporter: Dhruv Patel >Priority: Major > > This task is to move away from experimental support for Java 17 to production > support so that teams running Flink in production can migrate to Java 17 > successfully > *Background:* > Flink supports protobuf dataformat to exchange messages between different > operators and the serialization and deserialization of those protobufs are > performed by library called "Kryo". In order to move away from experimental > support of Java 17 released as part of Flink 1.18.1, the Kryo library in > Flink 1.18.1 needs to be updated from 2.24.0 to 5.5.0 because Kryo 2.24.0 > does not support Java 17. This improvement plan is tracked as part of this > ticket https://issues.apache.org/jira/browse/FLINK-3154. > All Flink applications using protobuf currently generate state with Kryo v2. > Once the above improvement plan is complete all Flink applications will fully > support reading that state and write newer state with Kryo v5. However, > latest Kryo v5 doesn't support snapshots made by a previous Kryo v2. This > will prevent applications which are using snapshot mechanism to deploy their > jobs to latest Flink version with Kryo v5 support without a bridge version > running on Java 11. Applications will have to run on a bridge release version > that will read their state with Kryo v2 data and write it with Kryo v5 data > before upgrading to a future version of Flink that completely drops support > for Kryo v2. > *Blocker for this task:* > Below are some of the blocker task which prevent certain Flink applications > which use protobuf from upgrading to Java 17 > *Blocker 1:* Upgrade to Kryo 5.5.0 which supports Java 17 > https://issues.apache.org/jira/browse/FLINK-3154. > *Blocker 2:* Validate whether a snapshot with Java 8 is compatible with Java > 17 without using a bridge version running on Java 11. > https://issues.apache.org/jira/browse/FLINK-33707 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build
[ https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-34541: --- Priority: Minor (was: Major) > Flink uses insecure http confluent endpoint in its build > > > Key: FLINK-34541 > URL: https://issues.apache.org/jira/browse/FLINK-34541 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: PJ Fanning >Priority: Minor > > See > https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55 > Please use https instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build
[ https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-34541: --- Issue Type: Technical Debt (was: Bug) > Flink uses insecure http confluent endpoint in its build > > > Key: FLINK-34541 > URL: https://issues.apache.org/jira/browse/FLINK-34541 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: PJ Fanning >Priority: Major > > See > https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55 > Please use https instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build
[ https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-34541: --- Component/s: Tests (was: Build System) > Flink uses insecure http confluent endpoint in its build > > > Key: FLINK-34541 > URL: https://issues.apache.org/jira/browse/FLINK-34541 > Project: Flink > Issue Type: Technical Debt > Components: Tests >Reporter: PJ Fanning >Priority: Minor > > See > https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55 > Please use https instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34541) Flink uses insecure http confluent endpoint in its build
[ https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821886#comment-17821886 ] Martijn Visser commented on FLINK-34541: There's only one test that still uses this; I don't think it's worthy of a major priority. If anything, it's tech debt. Updating the ticket. > Flink uses insecure http confluent endpoint in its build > > > Key: FLINK-34541 > URL: https://issues.apache.org/jira/browse/FLINK-34541 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: PJ Fanning >Priority: Major > > See > https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55 > Please use https instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34541) Flink uses insecure http confluent endpoint in its build
[ https://issues.apache.org/jira/browse/FLINK-34541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PJ Fanning updated FLINK-34541: --- Issue Type: Bug (was: Improvement) > Flink uses insecure http confluent endpoint in its build > > > Key: FLINK-34541 > URL: https://issues.apache.org/jira/browse/FLINK-34541 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: PJ Fanning >Priority: Major > > See > https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55 > Please use https instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34469][table] Implement TableDistribution toString [flink]
JingGe commented on code in PR #24338: URL: https://github.com/apache/flink/pull/24338#discussion_r1498379848 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableDistribution.java: ## @@ -142,4 +142,9 @@ && getBucketCount().get() != 0) { sb.append("\n"); return sb.toString(); } + +@Override +public String toString() { +return asSerializableString(); Review Comment: 1. If this is the requirement of the `toString()` method. We should consider marking the `asSerializableString()` as private and replace any external call of `asSerializableString()` with `toString()`. 2. Could you also add a test for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34541) Flink uses insecure http confluent endpoint in its build
PJ Fanning created FLINK-34541: -- Summary: Flink uses insecure http confluent endpoint in its build Key: FLINK-34541 URL: https://issues.apache.org/jira/browse/FLINK-34541 Project: Flink Issue Type: Improvement Components: Build System Reporter: PJ Fanning See https://github.com/apache/flink/blob/641f4f4d0d0156b84bdb9ba528b1dd96f7ae9d9c/flink-end-to-end-tests/test-scripts/kafka-common.sh#L55 Please use https instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on code in PR #182: URL: https://github.com/apache/flink-docker/pull/182#discussion_r1506657180 ## .github/workflows/ci.yml: ## @@ -17,18 +17,19 @@ name: "CI" on: [push, pull_request] +env: + TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz; + jobs: ci: -name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest -strategy: - fail-fast: false - matrix: Review Comment: Yes agree, it also isolates the tests to their own environment which is good. There was some docker issue with matrix setup, I will update it again and check that tests run successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34419][docker] Add tests for JDK 17 & 21 [flink-docker]
morazow commented on PR #182: URL: https://github.com/apache/flink-docker/pull/182#issuecomment-1969906057 Thanks @XComp, I'll ping you again once I update it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
snuyanzin commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1506638912 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private transient MethodHandle hashcodeHandle; + +private transient MethodHandle equalityHandle; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +hashcodeEvaluator = +context.createEvaluator( +Expressions.call("$HASHCODE$1", $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null) { +return null; +} + +List list = new ArrayList<>(); +Set seen = new HashSet<>(); + +boolean isNullPresentInArrayTwo = false; +if (arrayTwo != null) { +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +isNullPresentInArrayTwo = true; +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +seen.add(objectContainer); +} +} +} +boolean isNullPresentInArrayOne = false; +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +if (element == null) { +isNullPresentInArrayOne = true; +} else { +ObjectContainer
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
MartijnVisser commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1506585367 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private transient MethodHandle hashcodeHandle; + +private transient MethodHandle equalityHandle; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +hashcodeEvaluator = +context.createEvaluator( +Expressions.call("$HASHCODE$1", $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null) { +return null; +} + +List list = new ArrayList<>(); +Set seen = new HashSet<>(); + +boolean isNullPresentInArrayTwo = false; +if (arrayTwo != null) { +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +isNullPresentInArrayTwo = true; +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +seen.add(objectContainer); +} +} +} +boolean isNullPresentInArrayOne = false; +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +if (element == null) { +isNullPresentInArrayOne = true; +} else { +ObjectContainer
Re: [PR] [FLINK-34509] [docs] add missing "url" option for Debezium Avro [flink]
JingGe merged PR #24395: URL: https://github.com/apache/flink/pull/24395 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory (was: Tune TaskManager memory of autoscaled jobs) > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of austoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory of austoscaled jobs (was: Tune TaskManager memory) > Tune TaskManager memory of austoscaled jobs > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34540) Tune number of task slots
[ https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-34540: -- Assignee: (was: Maximilian Michels) > Tune number of task slots > - > > Key: FLINK-34540 > URL: https://issues.apache.org/jira/browse/FLINK-34540 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory of autoscaled jobs (was: Tune TaskManager memory of austoscaled jobs) > Tune TaskManager memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34540) Tune number of task slots
[ https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34540: --- Description: (was: Adjustments similar to FLINK-34152, but simpler because we only need to adjust heap memory and metaspace for the JobManager.) > Tune number of task slots > - > > Key: FLINK-34540 > URL: https://issues.apache.org/jira/browse/FLINK-34540 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Summary: Tune JobManager memory (was: Tune JobManager memory of autoscaled jobs) > Tune JobManager memory > -- > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory (was: Tune TaskManager memory of autoscaled jobs) > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34540) Tune number of task slots
Maximilian Michels created FLINK-34540: -- Summary: Tune number of task slots Key: FLINK-34540 URL: https://issues.apache.org/jira/browse/FLINK-34540 Project: Flink Issue Type: Sub-task Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 Adjustments similar to FLINK-34152, but simpler because we only need to adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: Umbrella issue to tackle tuning the Flink configuration as part of Flink Autoscaling. (was: Umbrella issue to tackle) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > > Umbrella issue to tackle tuning the Flink configuration as part of Flink > Autoscaling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Labels: (was: pull-request-available) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > > Umbrella issue to tackle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: Umbrella issue to tackle (was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Umbrella issue to tackle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Fix Version/s: (was: kubernetes-operator-1.8.0) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > > Umbrella issue to tackle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Summary: Tune Flink config of autoscaled jobs (was: Tune memory of autoscaled jobs) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34016) Janino compile failed when watermark with column by udf
[ https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821751#comment-17821751 ] Sebastien Pereira commented on FLINK-34016: --- Hi [~xuyangzhong] I have tested changes from your [PR|https://github.com/apache/flink/pull/24280] with a patch on 1.18.0 and it fixes the issue (I previously reported the issue on 1.18.0 in [FLINK-28693|https://issues.apache.org/jira/browse/FLINK-28693?focusedCommentId=17815957=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17815957] > Janino compile failed when watermark with column by udf > --- > > Key: FLINK-34016 > URL: https://issues.apache.org/jira/browse/FLINK-34016 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0, 1.18.0 >Reporter: ude >Priority: Major > Labels: pull-request-available > Attachments: image-2024-01-25-11-53-06-158.png, > image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, > image-2024-01-25-12-57-34-632.png > > > After submit the following flink sql by sql-client.sh will throw an exception: > {code:java} > Caused by: java.lang.RuntimeException: Could not instantiate generated class > 'WatermarkGenerator$0' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69) > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109) > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ... 16 more > Caused by: > org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 18 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868) > at >
[jira] [Comment Edited] (FLINK-34016) Janino compile failed when watermark with column by udf
[ https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821751#comment-17821751 ] Sebastien Pereira edited comment on FLINK-34016 at 2/28/24 4:28 PM: Hi [~xuyangzhong] I have tested changes from your [PR|https://github.com/apache/flink/pull/24280] with a patch on 1.18.0 and it fixes the issue (I previously reported the issue on 1.18.0 in [FLINK-28693|https://issues.apache.org/jira/browse/FLINK-28693?focusedCommentId=17815957=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17815957]) was (Author: JIRAUSER302567): Hi [~xuyangzhong] I have tested changes from your [PR|https://github.com/apache/flink/pull/24280] with a patch on 1.18.0 and it fixes the issue (I previously reported the issue on 1.18.0 in [FLINK-28693|https://issues.apache.org/jira/browse/FLINK-28693?focusedCommentId=17815957=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17815957] > Janino compile failed when watermark with column by udf > --- > > Key: FLINK-34016 > URL: https://issues.apache.org/jira/browse/FLINK-34016 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0, 1.18.0 >Reporter: ude >Priority: Major > Labels: pull-request-available > Attachments: image-2024-01-25-11-53-06-158.png, > image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, > image-2024-01-25-12-57-34-632.png > > > After submit the following flink sql by sql-client.sh will throw an exception: > {code:java} > Caused by: java.lang.RuntimeException: Could not instantiate generated class > 'WatermarkGenerator$0' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69) > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109) > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ... 16 more > Caused by: > org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 18 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be
[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Labels: (was: pull-request-available) > Tune JobManager memory of autoscaled jobs > - > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Description: Adjustments similar to FLINK-34152, but simpler because we only need to adjust heap memory and metaspace for the JobManager. (was: Similarly to) > Tune JobManager memory of autoscaled jobs > - > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit] > Tune memory of autoscaled jobs > -- > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
hanyuzheng7 commented on code in PR #23173: URL: https://github.com/apache/flink/pull/23173#discussion_r1506235081 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayExceptFunction.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_EXCEPT}. */ +@Internal +public class ArrayExceptFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final SpecializedFunction.ExpressionEvaluator hashcodeEvaluator; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private transient MethodHandle hashcodeHandle; + +private transient MethodHandle equalityHandle; + +public ArrayExceptFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_EXCEPT, context); +final DataType dataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType() +.toInternal(); +elementGetter = ArrayData.createElementGetter(dataType.toInternal().getLogicalType()); +hashcodeEvaluator = +context.createEvaluator( +Expressions.call("$HASHCODE$1", $("element1")), +DataTypes.INT(), +DataTypes.FIELD("element1", dataType.notNull().toInternal())); +equalityEvaluator = +context.createEvaluator( +$("element1").isEqual($("element2")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element1", dataType.notNull().toInternal()), +DataTypes.FIELD("element2", dataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +hashcodeHandle = hashcodeEvaluator.open(context); +equalityHandle = equalityEvaluator.open(context); +} + +public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) { +try { +if (arrayOne == null) { +return null; +} + +List list = new ArrayList<>(); +Set seen = new HashSet<>(); + +boolean isNullPresentInArrayTwo = false; +if (arrayTwo != null) { +for (int pos = 0; pos < arrayTwo.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayTwo, pos); +if (element == null) { +isNullPresentInArrayTwo = true; +} else { +ObjectContainer objectContainer = new ObjectContainer(element); +seen.add(objectContainer); +} +} +} +boolean isNullPresentInArrayOne = false; +for (int pos = 0; pos < arrayOne.size(); pos++) { +final Object element = elementGetter.getElementOrNull(arrayOne, pos); +if (element == null) { +isNullPresentInArrayOne = true; +} else { +ObjectContainer
[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Description: Similarly to (was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit ) > Tune JobManager memory of autoscaled jobs > - > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Similarly to -- This message was sent by Atlassian Jira (v8.20.10#820010)