[jira] [Updated] (FLINK-31436) Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store
[ https://issues.apache.org/jira/browse/FLINK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31436: --- Labels: pull-request-available (was: ) > Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in > Table Store > --- > > Key: FLINK-31436 > URL: https://issues.apache.org/jira/browse/FLINK-31436 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > As schema may change during a CTAS streaming job, the schema ID of snapshots > and manifest files may also change. We should remove \{{schemaId}} from their > constructor and calculate the real \{{schemaId}} on the fly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31437) 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'
[ https://issues.apache.org/jira/browse/FLINK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699962#comment-17699962 ] Zhimin Geng commented on FLINK-31437: - ok > 'lookup.cache.caching-missing-key' change should be configured as > 'lookup.partial-cache.caching-missing-key' > > > Key: FLINK-31437 > URL: https://issues.apache.org/jira/browse/FLINK-31437 > Project: Flink > Issue Type: Bug >Reporter: Zhimin Geng >Priority: Blocker > Attachments: image-2023-03-14-05-45-06-230.png, > image-2023-03-14-05-45-44-616.png > > > 'lookup.cache.caching-missing-key' change should be configured as > 'lookup.partial-cache.caching-missing-key'. > An error occurred when I configured a dimension table. > The configuration given by the official website is not available. > !image-2023-03-14-05-45-06-230.png! > !image-2023-03-14-05-45-44-616.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper opened a new pull request, #598: [FLINK-31436] Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store
tsreaper opened a new pull request, #598: URL: https://github.com/apache/flink-table-store/pull/598 As schema may change during a CTAS streaming job, the schema ID of snapshots and manifest files may also change. We should remove `schemaId` from their constructor and calculate the real `schemaId` on the fly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31437) 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'
[ https://issues.apache.org/jira/browse/FLINK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699960#comment-17699960 ] Junrui Li commented on FLINK-31437: --- [~gaara] Thanks for creating this issue, and *[xuzhiwen1255|https://github.com/xuzhiwen1255]* has proposed a hot-fix pr to fix this bug: https://github.com/apache/flink/pull/22167 > 'lookup.cache.caching-missing-key' change should be configured as > 'lookup.partial-cache.caching-missing-key' > > > Key: FLINK-31437 > URL: https://issues.apache.org/jira/browse/FLINK-31437 > Project: Flink > Issue Type: Bug >Reporter: Zhimin Geng >Priority: Blocker > Attachments: image-2023-03-14-05-45-06-230.png, > image-2023-03-14-05-45-44-616.png > > > 'lookup.cache.caching-missing-key' change should be configured as > 'lookup.partial-cache.caching-missing-key'. > An error occurred when I configured a dimension table. > The configuration given by the official website is not available. > !image-2023-03-14-05-45-06-230.png! > !image-2023-03-14-05-45-44-616.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31436) Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store
[ https://issues.apache.org/jira/browse/FLINK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng updated FLINK-31436: Summary: Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store (was: Remove schemaId from constructor of Snapshot and ManifestFile in Table Store) > Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in > Table Store > --- > > Key: FLINK-31436 > URL: https://issues.apache.org/jira/browse/FLINK-31436 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Fix For: table-store-0.4.0 > > > As schema may change during a CTAS streaming job, the schema ID of snapshots > and manifest files may also change. We should remove \{{schemaId}} from their > constructor and calculate the real \{{schemaId}} on the fly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31437) 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'
Zhimin Geng created FLINK-31437: --- Summary: 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key' Key: FLINK-31437 URL: https://issues.apache.org/jira/browse/FLINK-31437 Project: Flink Issue Type: Bug Reporter: Zhimin Geng Attachments: image-2023-03-14-05-45-06-230.png, image-2023-03-14-05-45-44-616.png 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'. An error occurred when I configured a dimension table. The configuration given by the official website is not available. !image-2023-03-14-05-45-06-230.png! !image-2023-03-14-05-45-44-616.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
liuyongvs commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1135000970 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ## @@ -178,6 +181,137 @@ Stream getTestSetSpecs() { null }, DataTypes.ARRAY( -DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE(); +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE(, + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE) +.onFieldsWithData( +new Integer[] {1, 2, 2}, +null, +new String[] {"Hello", "World"}, +new Row[] { +Row.of(true, LocalDate.of(2022, 4, 20)), +Row.of(true, LocalDate.of(1990, 10, 14)), +null +}, +new Integer[] {null, null, 1}, +new Integer[][] { +new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} +}, +new Map[] { +CollectionUtil.map(entry(1, "a"), entry(2, "b")), +CollectionUtil.map(entry(3, "c"), entry(4, "d")), +null +}) +.andDataTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.STRING()).notNull(), +DataTypes.ARRAY( +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), +DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), +DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING( +// ARRAY +.testResult( +$("f0").arrayRemove(2), +"ARRAY_REMOVE(f0, 2)", +new Integer[] {1}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayRemove(42), +"ARRAY_REMOVE(f0, 42)", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayRemove( +lit(null, DataTypes.SMALLINT()) + .cast(DataTypes.INT())), +"ARRAY_REMOVE(f0, cast(NULL AS INT))", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY of null value +.testResult( +$("f1").arrayRemove(12), +"ARRAY_REMOVE(f1, 12)", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f1").arrayRemove(null), +"ARRAY_REMOVE(f1, NULL)", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY NOT NULL +.testResult( +$("f2").arrayRemove("Hello"), +"ARRAY_REMOVE(f2, 'Hello')", +new String[] {"World"}, +DataTypes.ARRAY(DataTypes.STRING()).notNull()) +.testResult( +$("f2").arrayRemove( +lit(null, DataTypes.STRING()) + .cast(DataTypes.STRING())), +"ARRAY_REMOVE(f2, cast(NULL AS VARCHAR))", +new String[] {"Hello", "World"}, +DataTypes.ARRAY(DataTypes.STRING()).notNull()) +// ARRAY> +.testResult( +
[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
liuyongvs commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1135001220 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ## @@ -178,6 +181,137 @@ Stream getTestSetSpecs() { null }, DataTypes.ARRAY( -DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE(); +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE(, + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE) +.onFieldsWithData( +new Integer[] {1, 2, 2}, +null, +new String[] {"Hello", "World"}, +new Row[] { +Row.of(true, LocalDate.of(2022, 4, 20)), +Row.of(true, LocalDate.of(1990, 10, 14)), +null +}, +new Integer[] {null, null, 1}, +new Integer[][] { +new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} +}, +new Map[] { +CollectionUtil.map(entry(1, "a"), entry(2, "b")), +CollectionUtil.map(entry(3, "c"), entry(4, "d")), +null +}) +.andDataTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.STRING()).notNull(), +DataTypes.ARRAY( +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), +DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), +DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING( +// ARRAY +.testResult( +$("f0").arrayRemove(2), +"ARRAY_REMOVE(f0, 2)", +new Integer[] {1}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayRemove(42), +"ARRAY_REMOVE(f0, 42)", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayRemove( +lit(null, DataTypes.SMALLINT()) + .cast(DataTypes.INT())), +"ARRAY_REMOVE(f0, cast(NULL AS INT))", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY of null value +.testResult( +$("f1").arrayRemove(12), +"ARRAY_REMOVE(f1, 12)", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f1").arrayRemove(null), +"ARRAY_REMOVE(f1, NULL)", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY NOT NULL +.testResult( +$("f2").arrayRemove("Hello"), +"ARRAY_REMOVE(f2, 'Hello')", +new String[] {"World"}, +DataTypes.ARRAY(DataTypes.STRING()).notNull()) +.testResult( +$("f2").arrayRemove( +lit(null, DataTypes.STRING()) + .cast(DataTypes.STRING())), +"ARRAY_REMOVE(f2, cast(NULL AS VARCHAR))", +new String[] {"Hello", "World"}, +DataTypes.ARRAY(DataTypes.STRING()).notNull()) +// ARRAY> +.testResult( +
[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()
[ https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699956#comment-17699956 ] jinghaihang commented on FLINK-30863: - More to the point, I'm also confused about whether it's really necessary to do localChangelogRegistry.discardUpToCheckpoint(checkpointId) in the confirm() method. I think that between two checkpoints, there is no need to clean up the changelog file, but after the materialization is over. > Register local recovery files of changelog before notifyCheckpointComplete() > > > Key: FLINK-30863 > URL: https://issues.apache.org/jira/browse/FLINK-30863 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Attachments: tm-log_fail_cl_local_recovery.txt > > > If TM is materialized before receiving confirm(), the previously uploaded > queue in `FsStateChangelogWriter` will be cleared, so the local files of the > completed checkpoint will not be registered again, while the JM owned files > are registered before confirm(), and do not depend on the uploaded queue, so > the local files are deleted, and the DFS files are still there. > > We have encountered the following situation, the job cannot find the local > recovery files, but can restore from the DFS files: > {code:java} > 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.runtime.taskmanager.Task [] - > SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) > switched from DEPLOYING to INITIALIZING. > 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.runtime.state.TaskLocalStateStoreImpl [] - Found > registered local state for checkpoint 11599 in subtask > (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : > TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, > operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]}, > keyedStateFromStream=StateObjectCollection{[]}, > inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, > checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, > isTaskFinished=false} > 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Getting managed memory shared cache for RocksDB. > 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Obtained shared RocksDB cache of size 1438814063 bytes > 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Starting to restore from state handle: > IncrementalLocalKeyedStateHandle{metaDataState=File State: > file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5 > [1187 bytes]} > DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183}, > keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without > rescaling. > 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalLocalKeyedStateHandle{metaDataState=File State: > file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5 > [1187 bytes]} >
[jira] [Created] (FLINK-31436) Remove schemaId from constructor of Snapshot and ManifestFile in Table Store
Caizhi Weng created FLINK-31436: --- Summary: Remove schemaId from constructor of Snapshot and ManifestFile in Table Store Key: FLINK-31436 URL: https://issues.apache.org/jira/browse/FLINK-31436 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 As schema may change during a CTAS streaming job, the schema ID of snapshots and manifest files may also change. We should remove \{{schemaId}} from their constructor and calculate the real \{{schemaId}} on the fly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
liuyongvs commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1134999542 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayRemoveFunction.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_REMOVE}. */ +@Internal +public class ArrayRemoveFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private transient MethodHandle equalityHandle; + +public ArrayRemoveFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_REMOVE, context); +final DataType elementDataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType(); +final DataType needleDataType = context.getCallContext().getArgumentDataTypes().get(1); +elementGetter = ArrayData.createElementGetter(elementDataType.getLogicalType()); +equalityEvaluator = +context.createEvaluator( +$("element").isEqual($("needle")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element", elementDataType.notNull().toInternal()), +DataTypes.FIELD("needle", needleDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityHandle = equalityEvaluator.open(context); +} + +public @Nullable ArrayData eval(ArrayData haystack, Object needle) { +try { +if (haystack == null) { +return null; +} + +List list = new ArrayList(); +final int size = haystack.size(); +for (int pos = 0; pos < size; pos++) { +final Object element = elementGetter.getElementOrNull(haystack, pos); +if ((element == null && needle != null) +|| (element != null && needle == null) +|| (element != null +&& needle != null +&& !(boolean) equalityHandle.invoke(element, needle))) { Review Comment: have done @snuyanzin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31144) Slow scheduling on large-scale batch jobs
[ https://issues.apache.org/jira/browse/FLINK-31144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699955#comment-17699955 ] Zhu Zhu edited comment on FLINK-31144 at 3/14/23 5:41 AM: -- IIRC, the parallelism of that benchmark job (startScheduling.STREAMING) is 4000, which is relatively large. So the scheduling time can be significantly reduced with this improvement. was (Author: zhuzh): IIRC, the parallelism of that benchmark job is 4000, which is relatively large. So the scheduling time can be significantly reduced with this improvement. > Slow scheduling on large-scale batch jobs > -- > > Key: FLINK-31144 > URL: https://issues.apache.org/jira/browse/FLINK-31144 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Julien Tournay >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: Screenshot 2023-03-13 at 14.22.27.png, > flink-1.17-snapshot-1676473798013.nps, image-2023-02-21-10-29-49-388.png > > > When executing a complex job graph at high parallelism > `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` can > get slow and cause long pauses where the JobManager becomes unresponsive and > all the taskmanagers just wait. I've attached a VisualVM snapshot to > illustrate the problem.[^flink-1.17-snapshot-1676473798013.nps] > At Spotify we have complex jobs where this issue can cause batch "pause" of > 40+ minutes and make the overall execution 30% slower or more. > More importantly this prevent us from running said jobs on larger cluster as > adding resources to the cluster worsen the issue. > We have successfully tested a modified Flink version where > `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` was > completely commented and simply returns an empty collection and confirmed it > solves the issue. > In the same spirit as a recent change > ([https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L98-L102)] > there could be a mechanism in place to detect when Flink run into this > specific issue and just skip the call to `getInputLocationFutures` > [https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L105-L108.] > I'm not familiar enough with the internals of Flink to propose a more > advanced fix, however it seems like a configurable threshold on the number of > consumer vertices above which the preferred location is not computed would > do. If this solution is good enough, I'd be happy to submit a PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31144) Slow scheduling on large-scale batch jobs
[ https://issues.apache.org/jira/browse/FLINK-31144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699955#comment-17699955 ] Zhu Zhu commented on FLINK-31144: - IIRC, the parallelism of that benchmark job is 4000, which is relatively large. So the scheduling time can be significantly reduced with this improvement. > Slow scheduling on large-scale batch jobs > -- > > Key: FLINK-31144 > URL: https://issues.apache.org/jira/browse/FLINK-31144 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Julien Tournay >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > Attachments: Screenshot 2023-03-13 at 14.22.27.png, > flink-1.17-snapshot-1676473798013.nps, image-2023-02-21-10-29-49-388.png > > > When executing a complex job graph at high parallelism > `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` can > get slow and cause long pauses where the JobManager becomes unresponsive and > all the taskmanagers just wait. I've attached a VisualVM snapshot to > illustrate the problem.[^flink-1.17-snapshot-1676473798013.nps] > At Spotify we have complex jobs where this issue can cause batch "pause" of > 40+ minutes and make the overall execution 30% slower or more. > More importantly this prevent us from running said jobs on larger cluster as > adding resources to the cluster worsen the issue. > We have successfully tested a modified Flink version where > `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` was > completely commented and simply returns an empty collection and confirmed it > solves the issue. > In the same spirit as a recent change > ([https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L98-L102)] > there could be a mechanism in place to detect when Flink run into this > specific issue and just skip the call to `getInputLocationFutures` > [https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L105-L108.] > I'm not familiar enough with the internals of Flink to propose a more > advanced fix, however it seems like a configurable threshold on the number of > consumer vertices above which the preferred location is not computed would > do. If this solution is good enough, I'd be happy to submit a PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] swuferhong commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results
swuferhong commented on code in PR #22049: URL: https://github.com/apache/flink/pull/22049#discussion_r1134947973 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide( } } +private boolean isSuitableFilterToPush(RexNode filter, JoinRelType joinType) { +if (filter.isAlwaysTrue()) { +return false; +} +// For left/right outer join, we cannot push down IS_NULL filter to other side. Take left +// outer join as an example, If the join right side contains an IS_NULL filter, while we try +// to push it to the join left side and the left side have any other filter on this column, +// which will conflict and generate wrong plan. +if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT) Review Comment: > for left outer join, only the equal conditions from right side can be push to the left side. and vice versa for right outer join. > > please add some non equal conditions (such as a2 <> 10 ) in the IT case Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30748) Translate "Overview" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-30748. --- Fix Version/s: 1.18.0 Assignee: chenhaiyang Resolution: Fixed Fixed in master: 63fd315fdc9bef29e37021855860468bb093...5d15191bbd9d3e9c357c3d3d3d2e410361510022 > Translate "Overview" page of "Querys" into Chinese > -- > > Key: FLINK-30748 > URL: https://issues.apache.org/jira/browse/FLINK-30748 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: chenhaiyang >Assignee: chenhaiyang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The page url is > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/overview] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/overview.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #21726: [FLINK-30748][docs]Translate "Overview" page of "Querys" into Chinese
wuchong merged PR #21726: URL: https://github.com/apache/flink/pull/21726 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-30694. --- Fix Version/s: 1.18.0 (was: 1.16.2) Resolution: Fixed Fixed in master: a94b556e6a02dcc9220a9e19acd62c230550bf27...7b03113461c36d06c0643d9bdca7055071d41429 > Translate "Windowing TVF" page of "Querys" into Chinese > > > Key: FLINK-30694 > URL: https://issues.apache.org/jira/browse/FLINK-30694 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.16.0 >Reporter: chenhaiyang >Assignee: chenhaiyang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The page url is[ > [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/] > > The markdown file is located in > docs/content.zh/docs/dev/table/sql/queries/window-tvf.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #21823: [FLINK-30694][docs]Translate "Windowing TVF" page of "Querys" into Ch…
wuchong merged PR #21823: URL: https://github.com/apache/flink/pull/21823 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on pull request #21645: [FLINK-30556] Improve the logic for enumerating splits for Hive source to avoid potential OOM
WencongLiu commented on PR #21645: URL: https://github.com/apache/flink/pull/21645#issuecomment-1467302863 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode
[ https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699929#comment-17699929 ] Jane Chan commented on FLINK-26051: --- [~KristoffSC], Sorry for the late reply. Feel free to go ahead. Here are some investigations I made before; I hope that will help. By changing the CommonCalc#computeSelfCost to {code:scala} planner.getCostFactory.makeCost(newRowCnt, newRowCnt * (compCnt + 1), 0) {code} This issue can be fixed. Nonetheless, it will cause ~100 test plans to be changed. Over 50% of plans removed the Calc node that only contains projection. A few plans failed after applying this change. I think the leading blocker is evaluating the impact of this change to ensure there will be no performance regression for the affected tests. > one sql has row_number =1 and the subsequent SQL has "case when" and "where" > statement result Exception : The window can only be ordered in ASCENDING mode > -- > > Key: FLINK-26051 > URL: https://issues.apache.org/jira/browse/FLINK-26051 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.14.4 >Reporter: chuncheng wu >Assignee: Jane Chan >Priority: Major > Attachments: image-2022-02-10-20-13-14-424.png, > image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png > > > hello, > i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER statment) > where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields} > from result where ${filter_conditions}{color}" . The fields quoted in sql1 > has one "case when" field .The two sql can work well seperately.but if they > combine it results the exception as follow . It happen in the occasion when > logical plan turn into physical plan : > > {code:java} > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582) > at > com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at >
[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699930#comment-17699930 ] Yuxin Tan commented on FLINK-31418: --- I have created a PR for master and 1.17, could you help take a look? [~kevin.cyj] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on > CI > --- > > Key: FLINK-31418 > URL: https://issues.apache.org/jira/browse/FLINK-31418 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Assignee: Yuxin Tan >Priority: Critical > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756] > Error message: > {code:java} > Mar 13 05:22:10 [ERROR] Failures: > Mar 13 05:22:10 [ERROR] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 > Mar 13 05:22:10 Expecting value to be true but was false{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] godfreyhe commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results
godfreyhe commented on code in PR #22049: URL: https://github.com/apache/flink/pull/22049#discussion_r1134883506 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide( } } +private boolean isSuitableFilterToPush(RexNode filter, JoinRelType joinType) { +if (filter.isAlwaysTrue()) { +return false; +} +// For left/right outer join, we cannot push down IS_NULL filter to other side. Take left +// outer join as an example, If the join right side contains an IS_NULL filter, while we try +// to push it to the join left side and the left side have any other filter on this column, +// which will conflict and generate wrong plan. +if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT) Review Comment: for left outer join, only the equal conditions from right side can be push to the left side. and vice versa for right outer join. please add some non equal conditions (such as a2 <> 10 ) in the IT case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31414) exceptions in the alignment timer are ignored
[ https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699928#comment-17699928 ] Feifan Wang commented on FLINK-31414: - Thanks for reply [~pnowojski] , sorry for the lack of clarity in the previous description, let me answer your question first : {quote}the stack trace doesn't match to the master code, so I'm not sure what Flink version you are using? {quote} based on *release-1.16.1* , cherry-picked some bug fix. {quote}doesn't the error message "switched from RUNNING to FAILED" refer to actually subtask/task switching to FAILED state, contradicting your statement that the exception is being ignored? {quote} Yes, it is a subtask switching to FAILED state. I mean the exception thrown in the alignment timer task is being ignored, causing the subtask thread to continue executing to trigger the exception I posted above. Here is the more complete log ( I change some log level from debug to info ) : {code:java} 2023-03-10 12:09:42,416 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler - MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 (cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): Received barrier from channel InputChannelInfo{gateIdx=1, inputChannelIdx=586} @ 17. 2023-03-10 12:09:42,673 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 starting checkpoint 17 (CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default), alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807}) 2023-03-10 12:09:42,673 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 put ChannelStateWriteResult : 17 2023-03-10 12:09:42,675 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler - MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 (cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): Triggering checkpoint 17 on the barrier announcement at 1678421367671. 2023-03-10 12:09:42,675 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.streaming.runtime.tasks.StreamTask - triggerCheckpointOnBarrier Starting checkpoint 17 CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on task MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 2023-03-10 12:09:42,675 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint 17 CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on task MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 2023-03-10 12:09:42,675 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 requested write result, checkpoint 17 2023-03-10 12:09:42,676 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.state.changelog.ChangelogKeyedStateBackend - snapshot of MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1 for checkpoint 17, change range: 39..46, materialization ID 4 2023-03-10 12:09:42,677 INFO [MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1] org.apache.flink.streaming.runtime.tasks.RegularOperatorChain - Could not complete snapshot 17 for operator MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 17 for operator MV_J_PV - mv-join-after-operator - extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345) at
[GitHub] [flink-table-store] zhuangchong opened a new pull request, #597: [hotfix] Change the variable name `sekableInputStream` to `seekableInputStream`
zhuangchong opened a new pull request, #597: URL: https://github.com/apache/flink-table-store/pull/597 Change the variable name `sekableInputStream` to `seekableInputStream` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao reassigned FLINK-31418: --- Assignee: Yuxin Tan > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on > CI > --- > > Key: FLINK-31418 > URL: https://issues.apache.org/jira/browse/FLINK-31418 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Assignee: Yuxin Tan >Priority: Critical > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756] > Error message: > {code:java} > Mar 13 05:22:10 [ERROR] Failures: > Mar 13 05:22:10 [ERROR] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 > Mar 13 05:22:10 Expecting value to be true but was false{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31425) Support submitting a job with streamgraph
[ https://issues.apache.org/jira/browse/FLINK-31425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699925#comment-17699925 ] Junrui Li commented on FLINK-31425: --- [~tophei] Thanks for creating this issue. IIUC, according to the description, I think this issue can be divided into two parts: 1. Submit and run jobs by submitting a streamGraph, which is very meaningful. Submitting and running jobs through streamGraph can expand the dynamic adjustment capability of flink runtime. This is a relatively large and very meaningful change, and we are currently doing related research. 2. Make StreamGraph public so that users can get streamGraph. I'm not sure if this part has enough meaning to expose an internal interface to users, which may bring additional burden. > Support submitting a job with streamgraph > -- > > Key: FLINK-31425 > URL: https://issues.apache.org/jira/browse/FLINK-31425 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Jeff >Priority: Major > > Currently, we have rest api to submit a job via jobgraph, which is aligned to > the way of flink cli running the entry class locally and submit the compiled > binary to remote cluster for execution. > This is convenient in its own right. However it also seems to bring in some > confusion and 'blackbox' feeling in that the payload of rest api is a binary > object and thus not self-descriptive and it's relative a low-level > presentation of the job executions whose interface is more likely to change > as version evolves. > Do you think it make more sense to build an api that accepts streamgraph as > input which may be presented with a json(just like visualizer did for an > execution plan visualization) plus additional runtime related configs and > resources? This may make the rest interface more descriptive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
JingsongLi merged PR #594: URL: https://github.com/apache/flink-table-store/pull/594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31397) Introduce write-once hash lookup store
[ https://issues.apache.org/jira/browse/FLINK-31397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31397. Resolution: Fixed master: 0f5744a92bc52f52bc0f6644dc063dcb60fe326c > Introduce write-once hash lookup store > -- > > Key: FLINK-31397 > URL: https://issues.apache.org/jira/browse/FLINK-31397 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Introduce interface for lookup changelog producer: > {code:java} > /** > * A key-value store for lookup, key-value store should be single binary file > written once and ready > * to be used. This factory provide two interfaces: > * > * > * Writer: written once to prepare binary file. > * Reader: lookup value by key bytes. > * > */ > public interface LookupStoreFactory { > LookupStoreWriter createWriter(File file) throws IOException; > LookupStoreReader createReader(File file) throws IOException; > } > {code} > We can convert remote columnar data to local lookup store, and ready to be > used to lookup. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()
[ https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699920#comment-17699920 ] jinghaihang edited comment on FLINK-30863 at 3/14/23 3:16 AM: -- [~Yanfei Lei] Thanks for your reply. I don't think the current pr will fix this, and I verified that it does. The reason is still that the filter operator confirm() method will delete the local changelog file of the window operator, because the window operator file is not registered at this time. For this problem, I think of two possible solutions: Method 1. Change the logic of LocalChangelogRegistryImpl and add a mapping relationship between operator and file. When confirm() cleans up, only the files of its own operator are cleaned up; Method 2. In LocalChangelogRegistryImpl#discardUpToCheckpoint(), change entry.f1 < upTo to entry.f1 < upTo -1. Compare the two solutions: method 1 has relatively large changes; method 2 is a trick method with small changes, but it is not easy to understand. At the same time, there will be one more checkpoint file storage occupation. WDYT? was (Author: assassinj): [~Yanfei Lei] Thanks for your reply. I don't think the current pr will fix this, and I verified that it does. The reason is still that the filter operator confirm() method will delete the local changelog file of the window operator, because the window operator file is not registered at this time. For this problem, I think of two possible solutions“ Method 1. Change the logic of LocalChangelogRegistryImpl and add a mapping relationship between operator and file. When confirm() cleans up, only the files of its own operator are cleaned up; Method 2. In LocalChangelogRegistryImpl#discardUpToCheckpoint(), change entry.f1 < upTo to entry.f1 < upTo -1. Compare the two solutions: method 1 has relatively large changes; method 2 is a trick method with small changes, but it is not easy to understand. At the same time, there will be one more checkpoint file storage occupation. WDYT? > Register local recovery files of changelog before notifyCheckpointComplete() > > > Key: FLINK-30863 > URL: https://issues.apache.org/jira/browse/FLINK-30863 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Attachments: tm-log_fail_cl_local_recovery.txt > > > If TM is materialized before receiving confirm(), the previously uploaded > queue in `FsStateChangelogWriter` will be cleared, so the local files of the > completed checkpoint will not be registered again, while the JM owned files > are registered before confirm(), and do not depend on the uploaded queue, so > the local files are deleted, and the DFS files are still there. > > We have encountered the following situation, the job cannot find the local > recovery files, but can restore from the DFS files: > {code:java} > 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.runtime.taskmanager.Task [] - > SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) > switched from DEPLOYING to INITIALIZING. > 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.runtime.state.TaskLocalStateStoreImpl [] - Found > registered local state for checkpoint 11599 in subtask > (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : > TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, > operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]}, > keyedStateFromStream=StateObjectCollection{[]}, > inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, > checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, > isTaskFinished=false} > 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Getting managed memory shared cache for RocksDB. > 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Obtained shared RocksDB cache of size 1438814063 bytes > 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation >
[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()
[ https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699920#comment-17699920 ] jinghaihang commented on FLINK-30863: - [~Yanfei Lei] Thanks for your reply. I don't think the current pr will fix this, and I verified that it does. The reason is still that the filter operator confirm() method will delete the local changelog file of the window operator, because the window operator file is not registered at this time. For this problem, I think of two possible solutions“ Method 1. Change the logic of LocalChangelogRegistryImpl and add a mapping relationship between operator and file. When confirm() cleans up, only the files of its own operator are cleaned up; Method 2. In LocalChangelogRegistryImpl#discardUpToCheckpoint(), change entry.f1 < upTo to entry.f1 < upTo -1. Compare the two solutions: method 1 has relatively large changes; method 2 is a trick method with small changes, but it is not easy to understand. At the same time, there will be one more checkpoint file storage occupation. WDYT? > Register local recovery files of changelog before notifyCheckpointComplete() > > > Key: FLINK-30863 > URL: https://issues.apache.org/jira/browse/FLINK-30863 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.17.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Attachments: tm-log_fail_cl_local_recovery.txt > > > If TM is materialized before receiving confirm(), the previously uploaded > queue in `FsStateChangelogWriter` will be cleared, so the local files of the > completed checkpoint will not be registered again, while the JM owned files > are registered before confirm(), and do not depend on the uploaded queue, so > the local files are deleted, and the DFS files are still there. > > We have encountered the following situation, the job cannot find the local > recovery files, but can restore from the DFS files: > {code:java} > 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.runtime.taskmanager.Task [] - > SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) > switched from DEPLOYING to INITIALIZING. > 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.runtime.state.TaskLocalStateStoreImpl [] - Found > registered local state for checkpoint 11599 in subtask > (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : > TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, > operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]}, > keyedStateFromStream=StateObjectCollection{[]}, > inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, > checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, > isTaskFinished=false} > 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Getting managed memory shared cache for RocksDB. > 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Obtained shared RocksDB cache of size 1438814063 bytes > 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Starting to restore from state handle: > IncrementalLocalKeyedStateHandle{metaDataState=File State: > file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5 > [1187 bytes]} > DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183}, > keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without > rescaling. > 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO >
[GitHub] [flink-table-store] JingsongLi merged pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi merged PR #593: URL: https://github.com/apache/flink-table-store/pull/593 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31435) Introduce event parser for MySql Debezium JSON format in Table Store
Caizhi Weng created FLINK-31435: --- Summary: Introduce event parser for MySql Debezium JSON format in Table Store Key: FLINK-31435 URL: https://issues.apache.org/jira/browse/FLINK-31435 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 MySQL is widely used among Flink CDC connector users. We should first support consuming changes from MySQL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31434) Introduce CDC sink for Table Store
Caizhi Weng created FLINK-31434: --- Summary: Introduce CDC sink for Table Store Key: FLINK-31434 URL: https://issues.apache.org/jira/browse/FLINK-31434 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 To directly consume changes from Flink CDC connectors, we need a special CDC sink for Flink Table Store. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22173: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition test
flinkbot commented on PR #22173: URL: https://github.com/apache/flink/pull/22173#issuecomment-1467279898 ## CI report: * 9cff66bb2cf991967633d0b3aa88377224085b9f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31433) Make SchemaChange serializable
Caizhi Weng created FLINK-31433: --- Summary: Make SchemaChange serializable Key: FLINK-31433 URL: https://issues.apache.org/jira/browse/FLINK-31433 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 To avoid concurrent changes to table schema, CDC sinks for Flink Table Store should send all \{{SchemaChange}} to a special process function. This process function only has 1 parallelism and it is dedicated for schema changes. To pass \{{SchemaChange}} through network, \{{SchemaChange}} must be serializable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] syhily commented on pull request #35: [FLINK-31427][Table] Initial Catalog implementation with a new config model and schema conversion.
syhily commented on PR #35: URL: https://github.com/apache/flink-connector-pulsar/pull/35#issuecomment-1467278411 @reswqa Can you help me review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31432) Introduce a special StoreWriteOperator to deal with schema changes
Caizhi Weng created FLINK-31432: --- Summary: Introduce a special StoreWriteOperator to deal with schema changes Key: FLINK-31432 URL: https://issues.apache.org/jira/browse/FLINK-31432 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 Currently \{{StoreWriteOperator}} is not able to deal with schema changes. We need to introduce a special \{{StoreWriteOperator}} to deal with schema changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22173: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition test
TanYuxin-tyx opened a new pull request, #22173: URL: https://github.com/apache/flink/pull/22173 ## What is the purpose of the change *Fix buffer request timeout for sort merge result partition read test* ## Brief change log *(for example:)* - *Fix buffer request timeout for sort merge result partition read test* ## Verifying this change This change is a test fixup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31427) Pulsar Catalog support with Schema translation
[ https://issues.apache.org/jira/browse/FLINK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-31427: -- Assignee: Yufan Sheng > Pulsar Catalog support with Schema translation > -- > > Key: FLINK-31427 > URL: https://issues.apache.org/jira/browse/FLINK-31427 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Pulsar >Affects Versions: pulsar-4.0.0 >Reporter: Yufan Sheng >Assignee: Yufan Sheng >Priority: Major > Labels: pull-request-available > Fix For: pulsar-4.0.0 > > > This task will make the Pulsar serve as the Flink catalog. It will expose the > Pulsar's namespace as the Flink's database, the topic as the Flink's table. > You can easily create a table and database on Pulsar. The table can be > consumed by other clients with a valid schema check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31431) Support copying a FileStoreTable with latest schema
Caizhi Weng created FLINK-31431: --- Summary: Support copying a FileStoreTable with latest schema Key: FLINK-31431 URL: https://issues.apache.org/jira/browse/FLINK-31431 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 To capture schema changes, CDC sinks of Flink Table Store should be able to use the latest schema at any time. This requires us to copy a \{{FileStoreTable}} with latest schema so that we can create \{{TableWrite}} with latest schema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22172: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition read test
flinkbot commented on PR #22172: URL: https://github.com/apache/flink/pull/22172#issuecomment-1467272865 ## CI report: * 4438179c6b2181fcaec452f16a52a2b9033dc706 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31430) Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite
Caizhi Weng created FLINK-31430: --- Summary: Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite Key: FLINK-31430 URL: https://issues.apache.org/jira/browse/FLINK-31430 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 Currently {{Table}} and {{TableWrite}} in Flink Table Store have a fixed schema. However to consume schema changes, Flink Table Store CDC sinks should have the ability to change its schema during a streaming job. This require us to pause and store the states of a {{TableWrite}}, then create a {{TableWrite}} with newer schema and recover the states in the new {{TableWrite}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31429) Support CTAS(create table as) streaming job with schema changes in table store
Caizhi Weng created FLINK-31429: --- Summary: Support CTAS(create table as) streaming job with schema changes in table store Key: FLINK-31429 URL: https://issues.apache.org/jira/browse/FLINK-31429 Project: Flink Issue Type: New Feature Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Caizhi Weng Assignee: Caizhi Weng Fix For: table-store-0.4.0 Currently CDC connectors for Flink has the ability to stream out records changes and schema changes of a database table. Flink Table Store should have the ability to directly consume these changes, including schema changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31418: --- Labels: pull-request-available (was: ) > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on > CI > --- > > Key: FLINK-31418 > URL: https://issues.apache.org/jira/browse/FLINK-31418 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Priority: Critical > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756] > Error message: > {code:java} > Mar 13 05:22:10 [ERROR] Failures: > Mar 13 05:22:10 [ERROR] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 > Mar 13 05:22:10 Expecting value to be true but was false{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22172: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition read test
TanYuxin-tyx opened a new pull request, #22172: URL: https://github.com/apache/flink/pull/22172 ## What is the purpose of the change *Fix buffer request timeout for sort merge result partition read test* ## Brief change log *(for example:)* - *Fix buffer request timeout for sort merge result partition read test* ## Verifying this change This change is a test fixup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31269) Split hive connector to each module of each version
[ https://issues.apache.org/jira/browse/FLINK-31269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng closed FLINK-31269. --- Assignee: Shammon Resolution: Fixed master: dd2d600f6743ca074a023fdbb1a7a2cbcfbf8ff0 > Split hive connector to each module of each version > --- > > Key: FLINK-31269 > URL: https://issues.apache.org/jira/browse/FLINK-31269 > Project: Flink > Issue Type: Improvement > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: Shammon >Assignee: Shammon >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699913#comment-17699913 ] Yingjie Cao commented on FLINK-31418: - It is a test issue, we will fix it soon. > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on > CI > --- > > Key: FLINK-31418 > URL: https://issues.apache.org/jira/browse/FLINK-31418 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Priority: Critical > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756] > Error message: > {code:java} > Mar 13 05:22:10 [ERROR] Failures: > Mar 13 05:22:10 [ERROR] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 > Mar 13 05:22:10 Expecting value to be true but was false{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper merged pull request #569: [FLINK-31269] Split hive connector to each module of each version
tsreaper merged PR #569: URL: https://github.com/apache/flink-table-store/pull/569 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699911#comment-17699911 ] Qingsheng Ren commented on FLINK-31418: --- Thanks for the feedback [~kevin.cyj]. I'll downgrade this to critical as replied. > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on > CI > --- > > Key: FLINK-31418 > URL: https://issues.apache.org/jira/browse/FLINK-31418 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Priority: Blocker > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756] > Error message: > {code:java} > Mar 13 05:22:10 [ERROR] Failures: > Mar 13 05:22:10 [ERROR] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 > Mar 13 05:22:10 Expecting value to be true but was false{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-31418: -- Priority: Critical (was: Blocker) > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on > CI > --- > > Key: FLINK-31418 > URL: https://issues.apache.org/jira/browse/FLINK-31418 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Priority: Critical > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756] > Error message: > {code:java} > Mar 13 05:22:10 [ERROR] Failures: > Mar 13 05:22:10 [ERROR] > SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 > Mar 13 05:22:10 Expecting value to be true but was false{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
wuchong commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1134831904 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java: ## @@ -763,7 +783,7 @@ CatalogBaseTable instantiateCatalogTable(Table hiveTable) { tableSchemaProps.putProperties(properties); // try to get table schema with both new and old (1.10) key, in order to support tables // created in old version -tableSchema = +TableSchema tableSchema = Review Comment: `CatalogPropertiesUtil` is the alternative of `DescriptorProperties`. It should not be a major work to migrate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1134809157 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ## @@ -157,12 +165,12 @@ public static Operation convertChangeColumn( // disallow changing partition columns throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns"); } -TableSchema oldSchema = catalogTable.getSchema(); +ResolvedSchema oldSchema = catalogTable.getResolvedSchema(); Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31428) Add user callbacks to PulsarSource and PulsarSink
[ https://issues.apache.org/jira/browse/FLINK-31428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699901#comment-17699901 ] Yufan Sheng commented on FLINK-31428: - I see, the ser/deser issue matters. You want to intercept only on the real instance instead of the Pulsar `Message` instance, right? > Add user callbacks to PulsarSource and PulsarSink > -- > > Key: FLINK-31428 > URL: https://issues.apache.org/jira/browse/FLINK-31428 > Project: Flink > Issue Type: New Feature > Components: Connectors / Pulsar >Reporter: Alpha Diallo >Priority: Major > > We'd like to add support for user callbacks in {{PulsarSource}} and > {{{}PulsarSink{}}}. This enables specific use cases such as event tracing > which requires access to low level message properties such as message IDs > after an event is produced, topic partitions, etc... > The functionality required is similar to {{ConsumerInterceptor}} and > {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be > made for adding new APIs that would help avoid the extra cost of ser/deser > when getting the message body through the {{Message}} interface in the > interceptors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Aitozi commented on code in PR #21522: URL: https://github.com/apache/flink/pull/21522#discussion_r1134803012 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ## @@ -69,67 +69,75 @@ private OperationConverterUtils() {} public static Operation convertAddReplaceColumns( Review Comment: Oh, I get it. `SqlAddReplaceColumns` is Hive dialect, and is not used now. Will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31413) Remove provided dependency of flink-table-planner from Hive connector
[ https://issues.apache.org/jira/browse/FLINK-31413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia reassigned FLINK-31413: Assignee: luoyuxia > Remove provided dependency of flink-table-planner from Hive connector > - > > Key: FLINK-31413 > URL: https://issues.apache.org/jira/browse/FLINK-31413 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > > Can move on after finish FLINK-26603. We still need some follow-up tasks > before removing provided table-planner dependency in hive connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30659) drop flink-sql-parser-hive
[ https://issues.apache.org/jira/browse/FLINK-30659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia reassigned FLINK-30659: Assignee: luoyuxia > drop flink-sql-parser-hive > --- > > Key: FLINK-30659 > URL: https://issues.apache.org/jira/browse/FLINK-30659 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Chen Qin >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Hive Parser should stay with hive connector and maintained together. During > runtime, those package should load/unload together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31409) Hive dialect should use public interfaces in Hive connector
[ https://issues.apache.org/jira/browse/FLINK-31409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia reassigned FLINK-31409: Assignee: luoyuxia > Hive dialect should use public interfaces in Hive connector > --- > > Key: FLINK-31409 > URL: https://issues.apache.org/jira/browse/FLINK-31409 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > > Currently, for the Hive dialect part in Hive connector, it depends much > internal interfaces in flink-table-planner or other module. We should avoid > it and use public interfaces proposed in > [FLIP-216|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-216%3A++Introduce+pluggable+dialect+and+plan+for+migrating+Hive+dialect]] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-17398) Filesystem support flexible path reading
[ https://issues.apache.org/jira/browse/FLINK-17398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia reassigned FLINK-17398: Assignee: Hang Ruan (was: luoyuxia) > Filesystem support flexible path reading > > > Key: FLINK-17398 > URL: https://issues.apache.org/jira/browse/FLINK-17398 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Ecosystem >Reporter: Jingsong Lee >Assignee: Hang Ruan >Priority: Major > > Like: > * Single file reading > * wildcard path reading (regex) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] swuferhong commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results
swuferhong commented on code in PR #22049: URL: https://github.com/apache/flink/pull/22049#discussion_r1134791918 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide( } } +private boolean isSuitableFilterToPush(RexNode filter, JoinRelType joinType) { +if (filter.isAlwaysTrue()) { +return false; +} +// For left/right outer join, we cannot push down IS_NULL filter to other side. Take left +// outer join as an example, If the join right side contains an IS_NULL filter, while we try +// to push it to the join left side and the left side have any other filter on this column, +// which will conflict and generate wrong plan. +if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT) Review Comment: Hi, @herunkang2018. For method FlinkFilterJoinRule#pushFiltersToAnotherSide, we only support Inner/left/Right join. So there is no need to judge the full outer join here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31415) Support target alias and using ddls to create source
[ https://issues.apache.org/jira/browse/FLINK-31415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31415. Fix Version/s: table-store-0.4.0 Assignee: yuzelin Resolution: Fixed master: 3fe06f7384bb8ce5fab5997b5d0e81761874f092 > Support target alias and using ddls to create source > > > Key: FLINK-31415 > URL: https://issues.apache.org/jira/browse/FLINK-31415 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.4.0 >Reporter: yuzelin >Assignee: yuzelin >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #596: [FLINK-31415] Support target alias and using ddls to create source
JingsongLi merged PR #596: URL: https://github.com/apache/flink-table-store/pull/596 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31371) Stream failure if the topic doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-31371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699896#comment-17699896 ] Yufan Sheng commented on FLINK-31371: - Can we change this JIRA into a feature request with a new title: "Support subscribing non-existed topics in Pulsar source"? > Stream failure if the topic doesn't exist > - > > Key: FLINK-31371 > URL: https://issues.apache.org/jira/browse/FLINK-31371 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.3 >Reporter: Enzo Dechaene >Priority: Major > > *Describe the bug* > With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks > will fail at startup if the topic doesn't exist. > > *To Reproduce* > Create a stream with : > * Flink 1.15.2 > * Pulsar 2.8.4 > * with a Pulsar source or sink linked to a non existant topic > * Start the stream > > *Expected behavior* > If the topic doesn't exist, it should be created at the first connection of > the source or sink without error. > > *Additional context* > In the TopicListSubscriber class of the connector, the method > getSubscribedTopicPartitions() try to get the metadata of a topic by doing > that : > > {code:java} > TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code} > > If the topic doesn't exist, I get a NullPointerException on the metadata > We created a previous > [ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the > Pulsar connector and it was fixed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] herunkang2018 commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results
herunkang2018 commented on code in PR #22049: URL: https://github.com/apache/flink/pull/22049#discussion_r1134786876 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide( } } +private boolean isSuitableFilterToPush(RexNode filter, JoinRelType joinType) { +if (filter.isAlwaysTrue()) { +return false; +} +// For left/right outer join, we cannot push down IS_NULL filter to other side. Take left +// outer join as an example, If the join right side contains an IS_NULL filter, while we try +// to push it to the join left side and the left side have any other filter on this column, +// which will conflict and generate wrong plan. +if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT) Review Comment: if we need to check full outer join here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
JingsongLi commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134786122 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java: ## @@ -172,10 +133,30 @@ public MergeFunction create(@Nullable int[][] projection) { fieldTypes = project.project(tableTypes); } -return new AggregateMergeFunction( -createFieldGetters(fieldTypes), -new AggregateMergeFunction.RowAggregator( -conf, fieldNames, fieldTypes, primaryKeys)); +FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()]; +for (int i = 0; i < fieldNames.size(); i++) { +String fieldName = fieldNames.get(i); +DataType fieldType = fieldTypes.get(i); +// aggregate by primary keys, so they do not aggregate +boolean isPrimaryKey = primaryKeys.contains(fieldName); +String strAggFunc = +conf.get( +key(FIELDS + "." + fieldName + "." + AGG_FUNCTION) +.stringType() +.noDefaultValue() +.withDescription( +"Get " + fieldName + "'s aggregate function")); +boolean ignoreRetract = +conf.get( +key(FIELDS + "." + fieldName + "." + IGNORE_RETRACT) +.booleanType() +.defaultValue(false)); Review Comment: I delete description of `AGG_FUNCTION`, here the description is useless, it can not be see by users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
JingsongLi commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134784531 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.compact.CompactUnit; +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; + +import java.util.List; +import java.util.Optional; + +/** A {@link CompactStrategy} to force compacting level 0 files. */ +public class ForceUpLevel0Compaction implements CompactStrategy { + +private final UniversalCompaction universalCompaction; + +public ForceUpLevel0Compaction(UniversalCompaction universalCompaction) { +this.universalCompaction = universalCompaction; +} + +@Override +public Optional pick(int numLevels, List runs) { +Optional pick = universalCompaction.pick(numLevels, runs); +if (pick.isPresent()) { +return pick; +} + +if (runs.isEmpty() || runs.get(0).level() > 0) { +return Optional.empty(); +} + +// collect all level 0 files +int candidateCount = 1; Review Comment: I think we can finish this in one loop: ``` // collect all level 0 files int candidateCount = 0; for (int i = candidateCount; i < runs.size(); i++) { if (runs.get(i).level() > 0) { break; } candidateCount++; } return candidateCount == 0 ? Optional.empty() : Optional.of(universal.pickForSizeRatio(numLevels - 1, runs, candidateCount, true)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777880 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777430 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
JingsongLi commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134775901 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink] liujiawinds commented on pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5
liujiawinds commented on PR #19844: URL: https://github.com/apache/flink/pull/19844#issuecomment-1467197335 @pgaref Feel free to take over this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on pull request #224: univariatefeatureselector.md:101:1": failed to extract shortcode: sho…
lindong28 commented on PR #224: URL: https://github.com/apache/flink-ml/pull/224#issuecomment-1467184153 @alberta0714 Thanks for creating this PR. Could you explain how to reproduce the error "failed to extract shortcode"? We can add a test to catch issues like this in the future if we can reproduce this error. Note that the latest website can correctly show the doc for "Univariate Feature Selector" [1]. Do you know what the impact of this issue is? [1] https://nightlies.apache.org/flink/flink-ml-docs-master/docs/operators/feature/univariatefeatureselector/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
SteNicholas commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134714232 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java: ## @@ -0,0 +1,43 @@ +/* + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package org.apache.flink.table.store.utils; + +/** A simple read buffer provide {@code readUnsignedByte} and position. */ Review Comment: ```suggestion /** A simple read buffer provides {@code readUnsignedByte} and position. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
SteNicholas commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134713621 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
SteNicholas commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134711088 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
SteNicholas commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134708812 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java: ## @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.flink.table.store.lookup.hash; + +import org.apache.flink.table.store.lookup.LookupStoreWriter; +import org.apache.flink.table.store.utils.MurmurHashUtils; +import org.apache.flink.table.store.utils.VarLengthIntUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Internal write implementation for hash kv store. */ +public class HashLookupStoreWriter implements LookupStoreWriter { + +private static final Logger LOG = +LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); + +// load factor of hash map, default 0.75 +private final double loadFactor; +// Output +private final File tempFolder; +private final OutputStream outputStream; +// Index stream +private File[] indexFiles; +private DataOutputStream[] indexStreams; +// Data stream +private File[] dataFiles; +private DataOutputStream[] dataStreams; +// Cache last value +private byte[][] lastValues; +private int[] lastValuesLength; +// Data length +private long[] dataLengths; +// Index length +private long indexesLength; +// Max offset length +private int[] maxOffsetLengths; +// Number of keys +private int keyCount; +private int[] keyCounts; +// Number of values +private int valueCount; +// Number of collisions +private int collisions; + +HashLookupStoreWriter(double loadFactor, File file) throws IOException { +this.loadFactor = loadFactor; +if (loadFactor <= 0.0 || loadFactor >= 1.0) { +throw new IllegalArgumentException( +"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); +} + +tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); +if (!tempFolder.mkdir()) { +throw new IOException("Can not create temp folder: " + tempFolder); +} +outputStream = new BufferedOutputStream(new FileOutputStream(file)); +indexStreams = new DataOutputStream[0]; +dataStreams = new DataOutputStream[0]; +indexFiles = new File[0]; +dataFiles = new File[0]; +lastValues = new byte[0][]; +lastValuesLength = new int[0]; +dataLengths = new long[0]; +maxOffsetLengths = new int[0]; +keyCounts = new int[0]; +} + +@Override +public void put(byte[] key, byte[] value) throws IOException { +int keyLength = key.length; + +// Get the Output stream for that keyLength, each key length has its own file +DataOutputStream indexStream = getIndexStream(keyLength); + +// Write key +indexStream.write(key); + +// Check if the value is identical to the last inserted +byte[] lastValue = lastValues[keyLength]; +boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); + +// Get data stream and length +long dataLength = dataLengths[keyLength]; +if (sameValue) { +dataLength -=
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
SteNicholas commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134702880 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.lookup; + +import java.io.Closeable; +import java.io.IOException; + +/** Reader, lookup value by key bytes. */ +public interface LookupStoreReader extends Closeable { + +byte[] lookup(byte[] key) throws IOException; Review Comment: Adds the java doc for interface methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store
SteNicholas commented on code in PR #593: URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134703041 ## flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.lookup; + +import java.io.Closeable; +import java.io.IOException; + +/** Writer to prepare binary file. */ +public interface LookupStoreWriter extends Closeable { + +void put(byte[] key, byte[] value) throws IOException; Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
SteNicholas commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134698287 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java: ## @@ -172,10 +133,30 @@ public MergeFunction create(@Nullable int[][] projection) { fieldTypes = project.project(tableTypes); } -return new AggregateMergeFunction( -createFieldGetters(fieldTypes), -new AggregateMergeFunction.RowAggregator( -conf, fieldNames, fieldTypes, primaryKeys)); +FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()]; +for (int i = 0; i < fieldNames.size(); i++) { +String fieldName = fieldNames.get(i); +DataType fieldType = fieldTypes.get(i); +// aggregate by primary keys, so they do not aggregate +boolean isPrimaryKey = primaryKeys.contains(fieldName); +String strAggFunc = +conf.get( +key(FIELDS + "." + fieldName + "." + AGG_FUNCTION) +.stringType() +.noDefaultValue() +.withDescription( +"Get " + fieldName + "'s aggregate function")); +boolean ignoreRetract = +conf.get( +key(FIELDS + "." + fieldName + "." + IGNORE_RETRACT) +.booleanType() +.defaultValue(false)); Review Comment: Adds the description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
SteNicholas commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134693522 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.KeyValue; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedList; + +/** + * A {@link MergeFunction} for lookup, this wrapper only consider the latest high level record, Review Comment: ```suggestion * A {@link MergeFunction} for lookup, this wrapper only considers the latest high level record, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
SteNicholas commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134691216 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java: ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.data.InternalRow; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.types.RowKind; + +import java.util.function.Function; + +import static org.apache.flink.table.store.utils.Preconditions.checkArgument; + +/** + * Wrapper for {@link MergeFunction}s to produce changelog by lookup during the compaction involving + * level 0 files. + * + * Changelog records are generated in the process of the level-0 file participating in the + * compaction, if during the compaction processing: + * + * + * Without level-0 records, no changelog. + * With level-0 record, with level-x (x > 0) record, level-x record should be BEFORE, level-0 + * should be AFTER. + * With level-0 record, without level-x record, need to lookup the history value of the upper + * level as BEFORE. + * + */ +public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper { + +private final LookupMergeFunction mergeFunction; +private final MergeFunction mergeFunction2; +private final Function lookup; + +private final ChangelogResult reusedResult = new ChangelogResult(); +private final KeyValue reusedBefore = new KeyValue(); +private final KeyValue reusedAfter = new KeyValue(); + +public LookupChangelogMergeFunctionWrapper( +MergeFunctionFactory mergeFunctionFactory, +Function lookup) { +MergeFunction mergeFunction = mergeFunctionFactory.create(); +checkArgument( +mergeFunction instanceof LookupMergeFunction, +"Merge function should be a LookupMergeFunction, but is %s, there is a bug.", Review Comment: ```suggestion "Merge function should be a LookupMergeFunction, but is %s, there is a bug.", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
SteNicholas commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134689986 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.compact.CompactUnit; +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; + +import java.util.List; +import java.util.Optional; + +/** A {@link CompactStrategy} to force compacting level 0 files. */ +public class ForceUpLevel0Compaction implements CompactStrategy { + +private final UniversalCompaction universalCompaction; + +public ForceUpLevel0Compaction(UniversalCompaction universalCompaction) { +this.universalCompaction = universalCompaction; +} + +@Override +public Optional pick(int numLevels, List runs) { +Optional pick = universalCompaction.pick(numLevels, runs); +if (pick.isPresent()) { +return pick; +} + +if (runs.isEmpty() || runs.get(0).level() > 0) { +return Optional.empty(); +} + +// collect all level 0 files +int candidateCount = 1; Review Comment: Why not `int candidateCount = runs.stream().filter(r -> r.level() > 0).count();`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
SteNicholas commented on code in PR #594: URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134689986 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.compact.CompactUnit; +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; + +import java.util.List; +import java.util.Optional; + +/** A {@link CompactStrategy} to force compacting level 0 files. */ +public class ForceUpLevel0Compaction implements CompactStrategy { + +private final UniversalCompaction universalCompaction; + +public ForceUpLevel0Compaction(UniversalCompaction universalCompaction) { +this.universalCompaction = universalCompaction; +} + +@Override +public Optional pick(int numLevels, List runs) { +Optional pick = universalCompaction.pick(numLevels, runs); +if (pick.isPresent()) { +return pick; +} + +if (runs.isEmpty() || runs.get(0).level() > 0) { +return Optional.empty(); +} + +// collect all level 0 files +int candidateCount = 1; Review Comment: Why not `int candidateCount = runs.stream().filter(r->r.level()>0).count();`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31369) Harden modifiers for sql-gateway module
[ https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699863#comment-17699863 ] Sergey Nuyanzin commented on FLINK-31369: - Merged at: [20dc237f1b6eb32ef5344b5ece0a1e3a008e8bfd|https://github.com/apache/flink/commit/20dc237f1b6eb32ef5344b5ece0a1e3a008e8bfd] > Harden modifiers for sql-gateway module > --- > > Key: FLINK-31369 > URL: https://issues.apache.org/jira/browse/FLINK-31369 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway, Tests >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > This is a follow up jira issue for > https://github.com/apache/flink/pull/22127#discussion_r1129192778 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31369) Harden modifiers for sql-gateway module
[ https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-31369. - Fix Version/s: 1.18.0 Resolution: Fixed > Harden modifiers for sql-gateway module > --- > > Key: FLINK-31369 > URL: https://issues.apache.org/jira/browse/FLINK-31369 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway, Tests >Reporter: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > This is a follow up jira issue for > https://github.com/apache/flink/pull/22127#discussion_r1129192778 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin merged pull request #22135: [FLINK-31369][sql-gateway] Harden modifiers in tests for sql-gateway module
snuyanzin merged PR #22135: URL: https://github.com/apache/flink/pull/22135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31369) Harden modifiers for sql-gateway module
[ https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-31369: --- Assignee: Sergey Nuyanzin > Harden modifiers for sql-gateway module > --- > > Key: FLINK-31369 > URL: https://issues.apache.org/jira/browse/FLINK-31369 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway, Tests >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > This is a follow up jira issue for > https://github.com/apache/flink/pull/22127#discussion_r1129192778 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #548: [FLINK-31407] Bump fabric8 version to 6.5.0
mbalassi commented on code in PR #548: URL: https://github.com/apache/flink-kubernetes-operator/pull/548#discussion_r1134642722 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java: ## @@ -57,14 +58,14 @@ public void stopAndCleanupCluster(String clusterId) { .apps() .deployments() .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId)) -.cascading(true) +.withPropagationPolicy(DeletionPropagation.ORPHAN) Review Comment: @usamj could you please confirm that this was the intention here. ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java: ## @@ -57,14 +58,14 @@ public void stopAndCleanupCluster(String clusterId) { .apps() .deployments() .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId)) -.cascading(true) +.withPropagationPolicy(DeletionPropagation.ORPHAN) Review Comment: @usamj could you please confirm that this was the intention here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31407) Upgrade Fabric8 version to 6.5.0
[ https://issues.apache.org/jira/browse/FLINK-31407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31407: --- Labels: pull-request-available (was: ) > Upgrade Fabric8 version to 6.5.0 > > > Key: FLINK-31407 > URL: https://issues.apache.org/jira/browse/FLINK-31407 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Márton Balassi >Priority: Major > Labels: pull-request-available > > Fabric8 6.5.0 has been released recently with a number of major improvements: > [https://github.com/fabric8io/kubernetes-client/releases/tag/v6.5.0] > This is a very important version for the operator as it also fixes some > outstanding issues with timing out informers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] mbalassi opened a new pull request, #548: [FLINK-31407] Bump fabric8 version to 6.5.0
mbalassi opened a new pull request, #548: URL: https://github.com/apache/flink-kubernetes-operator/pull/548 This new version brings critical stability fixes for the informers. I fixed the newly introduced deprecation warnings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5
pgaref commented on PR #19844: URL: https://github.com/apache/flink/pull/19844#issuecomment-1466980765 @liujiawinds are you still working on this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
snuyanzin commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1134628496 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java: ## @@ -178,6 +181,137 @@ Stream getTestSetSpecs() { null }, DataTypes.ARRAY( -DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE(); +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE(, + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE) +.onFieldsWithData( +new Integer[] {1, 2, 2}, +null, +new String[] {"Hello", "World"}, +new Row[] { +Row.of(true, LocalDate.of(2022, 4, 20)), +Row.of(true, LocalDate.of(1990, 10, 14)), +null +}, +new Integer[] {null, null, 1}, +new Integer[][] { +new Integer[] {1, null, 3}, new Integer[] {0}, new Integer[] {1} +}, +new Map[] { +CollectionUtil.map(entry(1, "a"), entry(2, "b")), +CollectionUtil.map(entry(3, "c"), entry(4, "d")), +null +}) +.andDataTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.STRING()).notNull(), +DataTypes.ARRAY( +DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), +DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), +DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING( +// ARRAY +.testResult( +$("f0").arrayRemove(2), +"ARRAY_REMOVE(f0, 2)", +new Integer[] {1}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayRemove(42), +"ARRAY_REMOVE(f0, 42)", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f0").arrayRemove( +lit(null, DataTypes.SMALLINT()) + .cast(DataTypes.INT())), +"ARRAY_REMOVE(f0, cast(NULL AS INT))", +new Integer[] {1, 2, 2}, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY of null value +.testResult( +$("f1").arrayRemove(12), +"ARRAY_REMOVE(f1, 12)", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +.testResult( +$("f1").arrayRemove(null), +"ARRAY_REMOVE(f1, NULL)", +null, +DataTypes.ARRAY(DataTypes.INT()).nullable()) +// ARRAY NOT NULL +.testResult( +$("f2").arrayRemove("Hello"), +"ARRAY_REMOVE(f2, 'Hello')", +new String[] {"World"}, +DataTypes.ARRAY(DataTypes.STRING()).notNull()) +.testResult( +$("f2").arrayRemove( +lit(null, DataTypes.STRING()) + .cast(DataTypes.STRING())), +"ARRAY_REMOVE(f2, cast(NULL AS VARCHAR))", +new String[] {"Hello", "World"}, +DataTypes.ARRAY(DataTypes.STRING()).notNull()) +// ARRAY> +.testResult( +
[jira] [Updated] (FLINK-31428) Add user callbacks to PulsarSource and PulsarSink
[ https://issues.apache.org/jira/browse/FLINK-31428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alpha Diallo updated FLINK-31428: - Description: We'd like to add support for user callbacks in {{PulsarSource}} and {{{}PulsarSink{}}}. This enables specific use cases such as event tracing which requires access to low level message properties such as message IDs after an event is produced, topic partitions, etc... The functionality required is similar to {{ConsumerInterceptor}} and {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be made for adding new APIs that would help avoid the extra cost of ser/deser when getting the message body through the {{Message}} interface in the interceptors. was: We'd like to add support for user callbacks in PulsarSource and {{{}{{PulsarSink}}{}}}. This enables specific use cases such as event tracing which requires access to low level message properties such as message IDs after an event is produced, topic partitions, etc... The functionality required is similar to {{ConsumerInterceptor}} and {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be made for adding new APIs that would help avoid the extra cost of ser/deser when getting the message body through the {{Message}} interface in the interceptors. > Add user callbacks to PulsarSource and PulsarSink > -- > > Key: FLINK-31428 > URL: https://issues.apache.org/jira/browse/FLINK-31428 > Project: Flink > Issue Type: New Feature > Components: Connectors / Pulsar >Reporter: Alpha Diallo >Priority: Major > > We'd like to add support for user callbacks in {{PulsarSource}} and > {{{}PulsarSink{}}}. This enables specific use cases such as event tracing > which requires access to low level message properties such as message IDs > after an event is produced, topic partitions, etc... > The functionality required is similar to {{ConsumerInterceptor}} and > {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be > made for adding new APIs that would help avoid the extra cost of ser/deser > when getting the message body through the {{Message}} interface in the > interceptors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31428) Add user callbacks to PulsarSource and PulsarSink
Alpha Diallo created FLINK-31428: Summary: Add user callbacks to PulsarSource and PulsarSink Key: FLINK-31428 URL: https://issues.apache.org/jira/browse/FLINK-31428 Project: Flink Issue Type: New Feature Components: Connectors / Pulsar Reporter: Alpha Diallo We'd like to add support for user callbacks in PulsarSource and {{{}{{PulsarSink}}{}}}. This enables specific use cases such as event tracing which requires access to low level message properties such as message IDs after an event is produced, topic partitions, etc... The functionality required is similar to {{ConsumerInterceptor}} and {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be made for adding new APIs that would help avoid the extra cost of ser/deser when getting the message body through the {{Message}} interface in the interceptors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.
snuyanzin commented on code in PR #22144: URL: https://github.com/apache/flink/pull/22144#discussion_r1134622026 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayRemoveFunction.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_REMOVE}. */ +@Internal +public class ArrayRemoveFunction extends BuiltInScalarFunction { +private final ArrayData.ElementGetter elementGetter; +private final SpecializedFunction.ExpressionEvaluator equalityEvaluator; +private transient MethodHandle equalityHandle; + +public ArrayRemoveFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.ARRAY_REMOVE, context); +final DataType elementDataType = +((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) +.getElementDataType(); +final DataType needleDataType = context.getCallContext().getArgumentDataTypes().get(1); +elementGetter = ArrayData.createElementGetter(elementDataType.getLogicalType()); +equalityEvaluator = +context.createEvaluator( +$("element").isEqual($("needle")), +DataTypes.BOOLEAN(), +DataTypes.FIELD("element", elementDataType.notNull().toInternal()), +DataTypes.FIELD("needle", needleDataType.notNull().toInternal())); +} + +@Override +public void open(FunctionContext context) throws Exception { +equalityHandle = equalityEvaluator.open(context); +} + +public @Nullable ArrayData eval(ArrayData haystack, Object needle) { +try { +if (haystack == null) { +return null; +} + +List list = new ArrayList(); +final int size = haystack.size(); +for (int pos = 0; pos < size; pos++) { +final Object element = elementGetter.getElementOrNull(haystack, pos); +if ((element == null && needle != null) +|| (element != null && needle == null) +|| (element != null +&& needle != null +&& !(boolean) equalityHandle.invoke(element, needle))) { Review Comment: This could be simplified e.g. check for the negative case and continue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #22153: [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures.
pgaref commented on code in PR #22153: URL: https://github.com/apache/flink/pull/22153#discussion_r1134608280 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Information about the parallelism of job vertices. */ +public class JobResourceRequirements implements Serializable { + +private static final long serialVersionUID = 1L; + +/** + * A key for an internal config option (intentionally prefixed with $internal to make this + * explicit), that we'll serialize the {@link JobResourceRequirements} into, when writing it to + * {@link JobGraph}. + */ +private static final String JOB_RESOURCE_REQUIREMENTS_KEY = +"$internal.job-resource-requirements"; + +private static final JobResourceRequirements EMPTY = +new JobResourceRequirements(Collections.emptyMap()); + +/** + * Write {@link JobResourceRequirements resource requirements} into the configuration of a given + * {@link JobGraph}. + * + * @param jobGraph job graph to write requirements to + * @param jobResourceRequirements resource requirements to write + * @throws IOException in case we're not able to serialize requirements into the configuration + */ +public static void writeToJobGraph( +JobGraph jobGraph, JobResourceRequirements jobResourceRequirements) throws IOException { +InstantiationUtil.writeObjectToConfig( +jobResourceRequirements, +jobGraph.getJobConfiguration(), +JOB_RESOURCE_REQUIREMENTS_KEY); +} + +/** + * Read {@link JobResourceRequirements resource requirements} from the configuration of a given + * {@link JobGraph}. + * + * @param jobGraph job graph to read requirements from + * @throws IOException in case we're not able to deserialize requirements from the configuration + * @throws ClassNotFoundException in case some deserialized classes are missing on the classpath + */ +public static Optional readFromJobGraph(JobGraph jobGraph) +throws IOException, ClassNotFoundException { +return Optional.ofNullable( +InstantiationUtil.readObjectFromConfig( +jobGraph.getJobConfiguration(), +JOB_RESOURCE_REQUIREMENTS_KEY, +JobResourceRequirements.class.getClassLoader())); +} + +/** + * This method validates that the new job vertex parallelisms are less or equal to the max + * parallelism. Moreover, it validates that there are no unknown job vertex ids and that we're + * not missing any. Review Comment: We could also describe the special `-1` case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #22153: [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures.
pgaref commented on code in PR #22153: URL: https://github.com/apache/flink/pull/22153#discussion_r1134606959 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Information about the parallelism of job vertices. */ +public class JobResourceRequirements implements Serializable { + +private static final long serialVersionUID = 1L; + +/** + * A key for an internal config option (intentionally prefixed with $internal to make this + * explicit), that we'll serialize the {@link JobResourceRequirements} into, when writing it to + * {@link JobGraph}. + */ +private static final String JOB_RESOURCE_REQUIREMENTS_KEY = +"$internal.job-resource-requirements"; + +private static final JobResourceRequirements EMPTY = +new JobResourceRequirements(Collections.emptyMap()); + +/** + * Write {@link JobResourceRequirements resource requirements} into the configuration of a given + * {@link JobGraph}. + * + * @param jobGraph job graph to write requirements to + * @param jobResourceRequirements resource requirements to write + * @throws IOException in case we're not able to serialize requirements into the configuration + */ +public static void writeToJobGraph( +JobGraph jobGraph, JobResourceRequirements jobResourceRequirements) throws IOException { +InstantiationUtil.writeObjectToConfig( +jobResourceRequirements, +jobGraph.getJobConfiguration(), +JOB_RESOURCE_REQUIREMENTS_KEY); +} + +/** + * Read {@link JobResourceRequirements resource requirements} from the configuration of a given + * {@link JobGraph}. + * + * @param jobGraph job graph to read requirements from + * @throws IOException in case we're not able to deserialize requirements from the configuration + * @throws ClassNotFoundException in case some deserialized classes are missing on the classpath + */ +public static Optional readFromJobGraph(JobGraph jobGraph) +throws IOException, ClassNotFoundException { Review Comment: I guess the only way to throw a ClassNotFoundException here is for JobResourceRequirements (or inner ones) class to be missing ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #22153: [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures.
pgaref commented on code in PR #22153: URL: https://github.com/apache/flink/pull/22153#discussion_r1134587834 ## flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java: ## @@ -466,6 +468,7 @@ public static String checkForInstantiationError(Class clazz) { } } +@Nullable Review Comment: Minor but would it make sense to introduce the Runtime annotation in a separate ticket as it is used in a bunch of places? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31371) Stream failure if the topic doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-31371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699824#comment-17699824 ] Enzo Dechaene commented on FLINK-31371: --- Hi [~syhily], yes, it makes sense to do it that way because by default the pulsar broker parameter *allowAutoTopicCreation* is set to true. Thanks for the answer > Stream failure if the topic doesn't exist > - > > Key: FLINK-31371 > URL: https://issues.apache.org/jira/browse/FLINK-31371 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.15.3 >Reporter: Enzo Dechaene >Priority: Major > > *Describe the bug* > With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks > will fail at startup if the topic doesn't exist. > > *To Reproduce* > Create a stream with : > * Flink 1.15.2 > * Pulsar 2.8.4 > * with a Pulsar source or sink linked to a non existant topic > * Start the stream > > *Expected behavior* > If the topic doesn't exist, it should be created at the first connection of > the source or sink without error. > > *Additional context* > In the TopicListSubscriber class of the connector, the method > getSubscribedTopicPartitions() try to get the metadata of a topic by doing > that : > > {code:java} > TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code} > > If the topic doesn't exist, I get a NullPointerException on the metadata > We created a previous > [ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the > Pulsar connector and it was fixed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31427) Pulsar Catalog support with Schema translation
[ https://issues.apache.org/jira/browse/FLINK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31427: --- Labels: pull-request-available (was: ) > Pulsar Catalog support with Schema translation > -- > > Key: FLINK-31427 > URL: https://issues.apache.org/jira/browse/FLINK-31427 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Pulsar >Affects Versions: pulsar-4.0.0 >Reporter: Yufan Sheng >Priority: Major > Labels: pull-request-available > Fix For: pulsar-4.0.0 > > > This task will make the Pulsar serve as the Flink catalog. It will expose the > Pulsar's namespace as the Flink's database, the topic as the Flink's table. > You can easily create a table and database on Pulsar. The table can be > consumed by other clients with a valid schema check. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] syhily opened a new pull request, #35: [FLINK-31427][Table] Initial Catalog implementation with a new config model and schema conversion.
syhily opened a new pull request, #35: URL: https://github.com/apache/flink-connector-pulsar/pull/35 ## Purpose of the change This PR will make the Pulsar serve as the Flink catalog backend. It will expose the Pulsar's namespace as the Flink's database, the Pulsar's topic as the Flink's table. You can easily create any tables and databases on Pulsar. The tables created by Flink can be consumed by other Pulsar clients with a valid schema check. ## Brief change log - Support both `Duration` and numeric time in `PulsarConfiguration` class, which make the existing config options can be configured with a time str in Table API. - Alter all the existing time related config options to use `useDuration` method. - Initial `PulsarCatalog` implementation with new config model and schema translator and etc. ## Verifying this change This change doesn't contains any tests, it's a initial implementation which won't be used by any user. We will add tests in upcoming PRs. ## Significant changes - [x] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [x] New feature has been introduced - If yes, how is this documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31414) exceptions in the alignment timer are ignored
[ https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699776#comment-17699776 ] Piotr Nowojski commented on FLINK-31414: [~Feifan Wang], can you elaborate a bit more what's the problem? I see a couple of discrepancies in your description and the stack trace that you posted: * the stack trace doesn't match to the master code, so I'm not sure what Flink version you are using? * doesn't the error message "switched from RUNNING to FAILED" refer to actually subtask/task switching to FAILED state, contradicting your statement that the exception is being ignored? * in the PR, I don't see a test coverage - a working unit test/ITCase that used to be failing without your fix would be nice to have. Both for explaining what is the issue and for actually providing regression test coverage > exceptions in the alignment timer are ignored > - > > Key: FLINK-31414 > URL: https://issues.apache.org/jira/browse/FLINK-31414 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Priority: Major > Labels: pull-request-available > > Alignment timer task in alternating aligned checkpoint run as a future task > in mailbox thread, causing the exceptions > ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327]) > to be ignored. These exceptions should have failed the task, but now this > will cause the same checkpoint to fire twice initInputsCheckpoints in my test. > > {code:java} > switched from RUNNING to FAILED with failure cause: > java.lang.RuntimeException: unable to send request to worker > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83) > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516) > at > org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46) > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >