[jira] [Updated] (FLINK-31436) Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store

2023-03-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31436:
---
Labels: pull-request-available  (was: )

> Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in 
> Table Store
> ---
>
> Key: FLINK-31436
> URL: https://issues.apache.org/jira/browse/FLINK-31436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> As schema may change during a CTAS streaming job, the schema ID of snapshots 
> and manifest files may also change. We should remove \{{schemaId}} from their 
> constructor and calculate the real \{{schemaId}} on the fly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31437) 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'

2023-03-13 Thread Zhimin Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699962#comment-17699962
 ] 

Zhimin Geng commented on FLINK-31437:
-

ok

> 'lookup.cache.caching-missing-key' change should be configured as 
> 'lookup.partial-cache.caching-missing-key'
> 
>
> Key: FLINK-31437
> URL: https://issues.apache.org/jira/browse/FLINK-31437
> Project: Flink
>  Issue Type: Bug
>Reporter: Zhimin Geng
>Priority: Blocker
> Attachments: image-2023-03-14-05-45-06-230.png, 
> image-2023-03-14-05-45-44-616.png
>
>
> 'lookup.cache.caching-missing-key' change should be configured as 
> 'lookup.partial-cache.caching-missing-key'.
> An error occurred when I configured a dimension table.
> The configuration given by the official website is not available.
> !image-2023-03-14-05-45-06-230.png!
> !image-2023-03-14-05-45-44-616.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper opened a new pull request, #598: [FLINK-31436] Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store

2023-03-13 Thread via GitHub


tsreaper opened a new pull request, #598:
URL: https://github.com/apache/flink-table-store/pull/598

   As schema may change during a CTAS streaming job, the schema ID of snapshots 
and manifest files may also change. We should remove `schemaId` from their 
constructor and calculate the real `schemaId` on the fly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31437) 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'

2023-03-13 Thread Junrui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699960#comment-17699960
 ] 

Junrui Li commented on FLINK-31437:
---

[~gaara] Thanks for creating this issue, and 
*[xuzhiwen1255|https://github.com/xuzhiwen1255]* has proposed a hot-fix pr to 
fix this bug: https://github.com/apache/flink/pull/22167

> 'lookup.cache.caching-missing-key' change should be configured as 
> 'lookup.partial-cache.caching-missing-key'
> 
>
> Key: FLINK-31437
> URL: https://issues.apache.org/jira/browse/FLINK-31437
> Project: Flink
>  Issue Type: Bug
>Reporter: Zhimin Geng
>Priority: Blocker
> Attachments: image-2023-03-14-05-45-06-230.png, 
> image-2023-03-14-05-45-44-616.png
>
>
> 'lookup.cache.caching-missing-key' change should be configured as 
> 'lookup.partial-cache.caching-missing-key'.
> An error occurred when I configured a dimension table.
> The configuration given by the official website is not available.
> !image-2023-03-14-05-45-06-230.png!
> !image-2023-03-14-05-45-44-616.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31436) Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in Table Store

2023-03-13 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-31436:

Summary: Remove schemaId from constructor of FileStoreCommitImpl and 
ManifestFile in Table Store  (was: Remove schemaId from constructor of Snapshot 
and ManifestFile in Table Store)

> Remove schemaId from constructor of FileStoreCommitImpl and ManifestFile in 
> Table Store
> ---
>
> Key: FLINK-31436
> URL: https://issues.apache.org/jira/browse/FLINK-31436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
> Fix For: table-store-0.4.0
>
>
> As schema may change during a CTAS streaming job, the schema ID of snapshots 
> and manifest files may also change. We should remove \{{schemaId}} from their 
> constructor and calculate the real \{{schemaId}} on the fly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31437) 'lookup.cache.caching-missing-key' change should be configured as 'lookup.partial-cache.caching-missing-key'

2023-03-13 Thread Zhimin Geng (Jira)
Zhimin Geng created FLINK-31437:
---

 Summary: 'lookup.cache.caching-missing-key' change should be 
configured as 'lookup.partial-cache.caching-missing-key'
 Key: FLINK-31437
 URL: https://issues.apache.org/jira/browse/FLINK-31437
 Project: Flink
  Issue Type: Bug
Reporter: Zhimin Geng
 Attachments: image-2023-03-14-05-45-06-230.png, 
image-2023-03-14-05-45-44-616.png

'lookup.cache.caching-missing-key' change should be configured as 
'lookup.partial-cache.caching-missing-key'.
An error occurred when I configured a dimension table.
The configuration given by the official website is not available.
!image-2023-03-14-05-45-06-230.png!
!image-2023-03-14-05-45-44-616.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-13 Thread via GitHub


liuyongvs commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1135000970


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -178,6 +181,137 @@ Stream getTestSetSpecs() {
 null
 },
 DataTypes.ARRAY(
-DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE();
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE(,
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE)
+.onFieldsWithData(
+new Integer[] {1, 2, 2},
+null,
+new String[] {"Hello", "World"},
+new Row[] {
+Row.of(true, LocalDate.of(2022, 4, 20)),
+Row.of(true, LocalDate.of(1990, 10, 14)),
+null
+},
+new Integer[] {null, null, 1},
+new Integer[][] {
+new Integer[] {1, null, 3}, new Integer[] 
{0}, new Integer[] {1}
+},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d")),
+null
+})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()).notNull(),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+DataTypes.ARRAY(DataTypes.INT()),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING(
+// ARRAY
+.testResult(
+$("f0").arrayRemove(2),
+"ARRAY_REMOVE(f0, 2)",
+new Integer[] {1},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f0").arrayRemove(42),
+"ARRAY_REMOVE(f0, 42)",
+new Integer[] {1, 2, 2},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f0").arrayRemove(
+lit(null, DataTypes.SMALLINT())
+
.cast(DataTypes.INT())),
+"ARRAY_REMOVE(f0, cast(NULL AS INT))",
+new Integer[] {1, 2, 2},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+// ARRAY of null value
+.testResult(
+$("f1").arrayRemove(12),
+"ARRAY_REMOVE(f1, 12)",
+null,
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f1").arrayRemove(null),
+"ARRAY_REMOVE(f1, NULL)",
+null,
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+// ARRAY NOT NULL
+.testResult(
+$("f2").arrayRemove("Hello"),
+"ARRAY_REMOVE(f2, 'Hello')",
+new String[] {"World"},
+DataTypes.ARRAY(DataTypes.STRING()).notNull())
+.testResult(
+$("f2").arrayRemove(
+lit(null, DataTypes.STRING())
+
.cast(DataTypes.STRING())),
+"ARRAY_REMOVE(f2, cast(NULL AS VARCHAR))",
+new String[] {"Hello", "World"},
+DataTypes.ARRAY(DataTypes.STRING()).notNull())
+// ARRAY>
+.testResult(
+

[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-13 Thread via GitHub


liuyongvs commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1135001220


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -178,6 +181,137 @@ Stream getTestSetSpecs() {
 null
 },
 DataTypes.ARRAY(
-DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE();
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE(,
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE)
+.onFieldsWithData(
+new Integer[] {1, 2, 2},
+null,
+new String[] {"Hello", "World"},
+new Row[] {
+Row.of(true, LocalDate.of(2022, 4, 20)),
+Row.of(true, LocalDate.of(1990, 10, 14)),
+null
+},
+new Integer[] {null, null, 1},
+new Integer[][] {
+new Integer[] {1, null, 3}, new Integer[] 
{0}, new Integer[] {1}
+},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d")),
+null
+})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()).notNull(),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+DataTypes.ARRAY(DataTypes.INT()),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING(
+// ARRAY
+.testResult(
+$("f0").arrayRemove(2),
+"ARRAY_REMOVE(f0, 2)",
+new Integer[] {1},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f0").arrayRemove(42),
+"ARRAY_REMOVE(f0, 42)",
+new Integer[] {1, 2, 2},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f0").arrayRemove(
+lit(null, DataTypes.SMALLINT())
+
.cast(DataTypes.INT())),
+"ARRAY_REMOVE(f0, cast(NULL AS INT))",
+new Integer[] {1, 2, 2},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+// ARRAY of null value
+.testResult(
+$("f1").arrayRemove(12),
+"ARRAY_REMOVE(f1, 12)",
+null,
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f1").arrayRemove(null),
+"ARRAY_REMOVE(f1, NULL)",
+null,
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+// ARRAY NOT NULL
+.testResult(
+$("f2").arrayRemove("Hello"),
+"ARRAY_REMOVE(f2, 'Hello')",
+new String[] {"World"},
+DataTypes.ARRAY(DataTypes.STRING()).notNull())
+.testResult(
+$("f2").arrayRemove(
+lit(null, DataTypes.STRING())
+
.cast(DataTypes.STRING())),
+"ARRAY_REMOVE(f2, cast(NULL AS VARCHAR))",
+new String[] {"Hello", "World"},
+DataTypes.ARRAY(DataTypes.STRING()).notNull())
+// ARRAY>
+.testResult(
+

[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-03-13 Thread jinghaihang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699956#comment-17699956
 ] 

jinghaihang commented on FLINK-30863:
-

More to the point, I'm also confused about whether it's really necessary to do 
localChangelogRegistry.discardUpToCheckpoint(checkpointId) in the confirm() 
method.
I think that between two checkpoints, there is no need to clean up the 
changelog file, but after the materialization is over.

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Starting to restore from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> 

[jira] [Created] (FLINK-31436) Remove schemaId from constructor of Snapshot and ManifestFile in Table Store

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31436:
---

 Summary: Remove schemaId from constructor of Snapshot and 
ManifestFile in Table Store
 Key: FLINK-31436
 URL: https://issues.apache.org/jira/browse/FLINK-31436
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


As schema may change during a CTAS streaming job, the schema ID of snapshots 
and manifest files may also change. We should remove \{{schemaId}} from their 
constructor and calculate the real \{{schemaId}} on the fly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-13 Thread via GitHub


liuyongvs commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1134999542


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayRemoveFunction.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_REMOVE}. */
+@Internal
+public class ArrayRemoveFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+private transient MethodHandle equalityHandle;
+
+public ArrayRemoveFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_REMOVE, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+final DataType needleDataType = 
context.getCallContext().getArgumentDataTypes().get(1);
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+equalityEvaluator =
+context.createEvaluator(
+$("element").isEqual($("needle")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("needle", 
needleDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityHandle = equalityEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData haystack, Object needle) {
+try {
+if (haystack == null) {
+return null;
+}
+
+List list = new ArrayList();
+final int size = haystack.size();
+for (int pos = 0; pos < size; pos++) {
+final Object element = 
elementGetter.getElementOrNull(haystack, pos);
+if ((element == null && needle != null)
+|| (element != null && needle == null)
+|| (element != null
+&& needle != null
+&& !(boolean) equalityHandle.invoke(element, 
needle))) {

Review Comment:
   have done @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31144) Slow scheduling on large-scale batch jobs

2023-03-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699955#comment-17699955
 ] 

Zhu Zhu edited comment on FLINK-31144 at 3/14/23 5:41 AM:
--

IIRC, the parallelism of that benchmark job (startScheduling.STREAMING) is 
4000, which is relatively large. So the scheduling time can be significantly 
reduced with this improvement.


was (Author: zhuzh):
IIRC, the parallelism of that benchmark job is 4000, which is relatively large. 
So the scheduling time can be significantly reduced with this improvement.

> Slow scheduling on large-scale batch jobs 
> --
>
> Key: FLINK-31144
> URL: https://issues.apache.org/jira/browse/FLINK-31144
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Julien Tournay
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: Screenshot 2023-03-13 at 14.22.27.png, 
> flink-1.17-snapshot-1676473798013.nps, image-2023-02-21-10-29-49-388.png
>
>
> When executing a complex job graph at high parallelism 
> `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` can 
> get slow and cause long pauses where the JobManager becomes unresponsive and 
> all the taskmanagers just wait. I've attached a VisualVM snapshot to 
> illustrate the problem.[^flink-1.17-snapshot-1676473798013.nps]
> At Spotify we have complex jobs where this issue can cause batch "pause" of 
> 40+ minutes and make the overall execution 30% slower or more.
> More importantly this prevent us from running said jobs on larger cluster as 
> adding resources to the cluster worsen the issue.
> We have successfully tested a modified Flink version where 
> `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` was 
> completely commented and simply returns an empty collection and confirmed it 
> solves the issue.
> In the same spirit as a recent change 
> ([https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L98-L102)]
>  there could be a mechanism in place to detect when Flink run into this 
> specific issue and just skip the call to `getInputLocationFutures`  
> [https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L105-L108.]
> I'm not familiar enough with the internals of Flink to propose a more 
> advanced fix, however it seems like a configurable threshold on the number of 
> consumer vertices above which the preferred location is not computed would 
> do. If this  solution is good enough, I'd be happy to submit a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31144) Slow scheduling on large-scale batch jobs

2023-03-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699955#comment-17699955
 ] 

Zhu Zhu commented on FLINK-31144:
-

IIRC, the parallelism of that benchmark job is 4000, which is relatively large. 
So the scheduling time can be significantly reduced with this improvement.

> Slow scheduling on large-scale batch jobs 
> --
>
> Key: FLINK-31144
> URL: https://issues.apache.org/jira/browse/FLINK-31144
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Julien Tournay
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: Screenshot 2023-03-13 at 14.22.27.png, 
> flink-1.17-snapshot-1676473798013.nps, image-2023-02-21-10-29-49-388.png
>
>
> When executing a complex job graph at high parallelism 
> `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` can 
> get slow and cause long pauses where the JobManager becomes unresponsive and 
> all the taskmanagers just wait. I've attached a VisualVM snapshot to 
> illustrate the problem.[^flink-1.17-snapshot-1676473798013.nps]
> At Spotify we have complex jobs where this issue can cause batch "pause" of 
> 40+ minutes and make the overall execution 30% slower or more.
> More importantly this prevent us from running said jobs on larger cluster as 
> adding resources to the cluster worsen the issue.
> We have successfully tested a modified Flink version where 
> `DefaultPreferredLocationsRetriever.getPreferredLocationsBasedOnInputs` was 
> completely commented and simply returns an empty collection and confirmed it 
> solves the issue.
> In the same spirit as a recent change 
> ([https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L98-L102)]
>  there could be a mechanism in place to detect when Flink run into this 
> specific issue and just skip the call to `getInputLocationFutures`  
> [https://github.com/apache/flink/blob/43f419d0eccba86ecc8040fa6f521148f1e358ff/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java#L105-L108.]
> I'm not familiar enough with the internals of Flink to propose a more 
> advanced fix, however it seems like a configurable threshold on the number of 
> consumer vertices above which the preferred location is not computed would 
> do. If this  solution is good enough, I'd be happy to submit a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] swuferhong commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results

2023-03-13 Thread via GitHub


swuferhong commented on code in PR #22049:
URL: https://github.com/apache/flink/pull/22049#discussion_r1134947973


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java:
##
@@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide(
 }
 }
 
+private boolean isSuitableFilterToPush(RexNode filter, JoinRelType 
joinType) {
+if (filter.isAlwaysTrue()) {
+return false;
+}
+// For left/right outer join, we cannot push down IS_NULL filter to 
other side. Take left
+// outer join as an example, If the join right side contains an 
IS_NULL filter, while we try
+// to push it to the join left side and the left side have any other 
filter on this column,
+// which will conflict and generate wrong plan.
+if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT)

Review Comment:
   > for left outer join, only the equal conditions from right side can be push 
to the left side. and vice versa for right outer join.
   > 
   > please add some non equal conditions (such as a2 <> 10 ) in the IT case
   
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30748) Translate "Overview" page of "Querys" into Chinese

2023-03-13 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-30748.
---
Fix Version/s: 1.18.0
 Assignee: chenhaiyang
   Resolution: Fixed

Fixed in master: 
63fd315fdc9bef29e37021855860468bb093...5d15191bbd9d3e9c357c3d3d3d2e410361510022

> Translate "Overview" page of "Querys" into Chinese
> --
>
> Key: FLINK-30748
> URL: https://issues.apache.org/jira/browse/FLINK-30748
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: chenhaiyang
>Assignee: chenhaiyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The page url is 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/overview]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/overview.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong merged pull request #21726: [FLINK-30748][docs]Translate "Overview" page of "Querys" into Chinese

2023-03-13 Thread via GitHub


wuchong merged PR #21726:
URL: https://github.com/apache/flink/pull/21726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30694) Translate "Windowing TVF" page of "Querys" into Chinese

2023-03-13 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-30694.
---
Fix Version/s: 1.18.0
   (was: 1.16.2)
   Resolution: Fixed

Fixed in master: 
a94b556e6a02dcc9220a9e19acd62c230550bf27...7b03113461c36d06c0643d9bdca7055071d41429

> Translate "Windowing TVF" page of "Querys" into Chinese 
> 
>
> Key: FLINK-30694
> URL: https://issues.apache.org/jira/browse/FLINK-30694
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.16.0
>Reporter: chenhaiyang
>Assignee: chenhaiyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The page url is[ 
> [https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/window-tvf/]
>  
> The markdown file is located in 
> docs/content.zh/docs/dev/table/sql/queries/window-tvf.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong merged pull request #21823: [FLINK-30694][docs]Translate "Windowing TVF" page of "Querys" into Ch…

2023-03-13 Thread via GitHub


wuchong merged PR #21823:
URL: https://github.com/apache/flink/pull/21823


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on pull request #21645: [FLINK-30556] Improve the logic for enumerating splits for Hive source to avoid potential OOM

2023-03-13 Thread via GitHub


WencongLiu commented on PR #21645:
URL: https://github.com/apache/flink/pull/21645#issuecomment-1467302863

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26051) one sql has row_number =1 and the subsequent SQL has "case when" and "where" statement result Exception : The window can only be ordered in ASCENDING mode

2023-03-13 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699929#comment-17699929
 ] 

Jane Chan commented on FLINK-26051:
---

[~KristoffSC], Sorry for the late reply. Feel free to go ahead.

Here are some investigations I made before; I hope that will help.

By changing the CommonCalc#computeSelfCost to 
{code:scala}
planner.getCostFactory.makeCost(newRowCnt, newRowCnt * (compCnt + 1), 0) {code}
This issue can be fixed.

Nonetheless, it will cause ~100 test plans to be changed. Over 50% of plans 
removed the Calc node that only contains projection. A few plans failed after 
applying this change. 

I think the leading blocker is evaluating the impact of this change to ensure 
there will be no performance regression for the affected tests.  

 

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> --
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.14.4
>Reporter: chuncheng wu
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> 

[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yuxin Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699930#comment-17699930
 ] 

Yuxin Tan commented on FLINK-31418:
---

I have created a PR for master and 1.17, could you help take a look? 
[~kevin.cyj]

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Yuxin Tan
>Priority: Critical
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] godfreyhe commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results

2023-03-13 Thread via GitHub


godfreyhe commented on code in PR #22049:
URL: https://github.com/apache/flink/pull/22049#discussion_r1134883506


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java:
##
@@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide(
 }
 }
 
+private boolean isSuitableFilterToPush(RexNode filter, JoinRelType 
joinType) {
+if (filter.isAlwaysTrue()) {
+return false;
+}
+// For left/right outer join, we cannot push down IS_NULL filter to 
other side. Take left
+// outer join as an example, If the join right side contains an 
IS_NULL filter, while we try
+// to push it to the join left side and the left side have any other 
filter on this column,
+// which will conflict and generate wrong plan.
+if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT)

Review Comment:
   for left outer join, only the equal conditions from right side can be push 
to the left side. and vice versa for right outer join.
   
   please add some non equal conditions (such as a2 <> 10 ) in the IT case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699928#comment-17699928
 ] 

Feifan Wang commented on FLINK-31414:
-

Thanks for reply [~pnowojski] , sorry for the lack of clarity in the previous 
description, let me answer your question first :
{quote}the stack trace doesn't match to the master code, so I'm not sure what 
Flink version you are using?
{quote}
based on *release-1.16.1* , cherry-picked some bug fix.
{quote}doesn't the error message "switched from RUNNING to FAILED" refer to 
actually subtask/task switching to FAILED state, contradicting your statement 
that the exception is being ignored?
{quote}
Yes, it is a subtask switching to FAILED state. I mean the exception thrown in 
the alignment timer task is being ignored, causing the subtask thread to 
continue executing to trigger the exception I posted above.

Here is the more complete log ( I change some log level from debug to info ) :

 
{code:java}
2023-03-10 12:09:42,416 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV - mv-join-after-operator - extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Received barrier from channel InputChannelInfo{gateIdx=1, inputChannelIdx=586} 
@ 17.
2023-03-10 12:09:42,673 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
- mv-join-after-operator - extract-event-identifier (4517/4800)#1 
starting checkpoint 17 
(CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default), 
alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807})
2023-03-10 12:09:42,673 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
- mv-join-after-operator - extract-event-identifier (4517/4800)#1 put 
ChannelStateWriteResult : 17
2023-03-10 12:09:42,675 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV - mv-join-after-operator - extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Triggering checkpoint 17 on the barrier announcement at 1678421367671.
2023-03-10 12:09:42,675 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.StreamTask           - 
triggerCheckpointOnBarrier Starting checkpoint 17 
CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on 
task MV_J_PV - mv-join-after-operator - extract-event-identifier 
(4517/4800)#1
2023-03-10 12:09:42,675 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting 
checkpoint 17 CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD} on task MV_J_PV - 
mv-join-after-operator - extract-event-identifier (4517/4800)#1
2023-03-10 12:09:42,675 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
- mv-join-after-operator - extract-event-identifier (4517/4800)#1 
requested write result, checkpoint 17
2023-03-10 12:09:42,676 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend   - snapshot of 
MV_J_PV - mv-join-after-operator - extract-event-identifier 
(4517/4800)#1 for checkpoint 17, change range: 39..46, materialization ID 4
2023-03-10 12:09:42,677 INFO  [MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain  - Could not 
complete snapshot 17 for operator MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 17 for operator MV_J_PV - mv-join-after-operator - 
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
    at 

[GitHub] [flink-table-store] zhuangchong opened a new pull request, #597: [hotfix] Change the variable name `sekableInputStream` to `seekableInputStream`

2023-03-13 Thread via GitHub


zhuangchong opened a new pull request, #597:
URL: https://github.com/apache/flink-table-store/pull/597

   Change the variable name `sekableInputStream` to `seekableInputStream`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yingjie Cao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingjie Cao reassigned FLINK-31418:
---

Assignee: Yuxin Tan

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Yuxin Tan
>Priority: Critical
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31425) Support submitting a job with streamgraph

2023-03-13 Thread Junrui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699925#comment-17699925
 ] 

Junrui Li commented on FLINK-31425:
---

[~tophei] Thanks for creating this issue. IIUC, according to the description, I 
think this issue can be divided into two parts:
1. Submit and run jobs by submitting a streamGraph, which is very meaningful. 
Submitting and running jobs through streamGraph can expand the dynamic 
adjustment capability of flink runtime. This is a relatively large and very 
meaningful change, and we are currently doing related research.
2. Make StreamGraph public so that users can get streamGraph. I'm not sure if 
this part has enough meaning to expose an internal interface to users, which 
may bring additional burden.

> Support submitting a job with streamgraph 
> --
>
> Key: FLINK-31425
> URL: https://issues.apache.org/jira/browse/FLINK-31425
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Jeff
>Priority: Major
>
> Currently, we have rest api to submit a job via jobgraph, which is aligned to 
> the way of flink cli running the entry class locally and submit the compiled 
> binary to remote cluster for execution.
> This is convenient in its own right. However it also seems to bring in some 
> confusion and 'blackbox' feeling in that the payload of rest api is a binary 
> object and thus not self-descriptive and it's relative a low-level 
> presentation of the job executions whose interface is more likely to change 
> as version evolves. 
> Do you think it make more sense to build an api that accepts streamgraph as 
> input which may be presented with a json(just like visualizer did for an 
> execution plan visualization) plus additional runtime related configs and 
> resources? This may make the rest interface more descriptive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


JingsongLi merged PR #594:
URL: https://github.com/apache/flink-table-store/pull/594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31397) Introduce write-once hash lookup store

2023-03-13 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31397.

Resolution: Fixed

master: 0f5744a92bc52f52bc0f6644dc063dcb60fe326c

> Introduce write-once hash lookup store
> --
>
> Key: FLINK-31397
> URL: https://issues.apache.org/jira/browse/FLINK-31397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Introduce interface for lookup changelog producer:
> {code:java}
> /**
>  * A key-value store for lookup, key-value store should be single binary file 
> written once and ready
>  * to be used. This factory provide two interfaces:
>  *
>  * 
>  *   Writer: written once to prepare binary file.
>  *   Reader: lookup value by key bytes.
>  * 
>  */
> public interface LookupStoreFactory {
> LookupStoreWriter createWriter(File file) throws IOException;
> LookupStoreReader createReader(File file) throws IOException;
> }
>  {code}
> We can convert remote columnar data to local lookup store, and ready to be 
> used to lookup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-03-13 Thread jinghaihang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699920#comment-17699920
 ] 

jinghaihang edited comment on FLINK-30863 at 3/14/23 3:16 AM:
--

[~Yanfei Lei]  Thanks for your reply.

I don't think the current pr will fix this, and I verified that it does.
The reason is still that the filter operator confirm() method will delete the 
local changelog file of the window operator, because the window operator file 
is not registered at this time.

For this problem, I think of two possible solutions:
Method 1. Change the logic of LocalChangelogRegistryImpl and add a mapping 
relationship between operator and file. When confirm() cleans up, only the 
files of its own operator are cleaned up;
Method 2. In LocalChangelogRegistryImpl#discardUpToCheckpoint(), change 
entry.f1 < upTo to entry.f1 < upTo -1.

Compare the two solutions: method 1 has relatively large changes; method 2 is a 
trick method with small changes, but it is not easy to understand. At the same 
time, there will be one more checkpoint file storage occupation. WDYT?


was (Author: assassinj):
[~Yanfei Lei]  Thanks for your reply.

I don't think the current pr will fix this, and I verified that it does.
The reason is still that the filter operator confirm() method will delete the 
local changelog file of the window operator, because the window operator file 
is not registered at this time.


For this problem, I think of two possible solutions“
Method 1. Change the logic of LocalChangelogRegistryImpl and add a mapping 
relationship between operator and file. When confirm() cleans up, only the 
files of its own operator are cleaned up;
Method 2. In LocalChangelogRegistryImpl#discardUpToCheckpoint(), change 
entry.f1 < upTo to entry.f1 < upTo -1.

Compare the two solutions: method 1 has relatively large changes; method 2 is a 
trick method with small changes, but it is not easy to understand. At the same 
time, there will be one more checkpoint file storage occupation. WDYT?

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  

[jira] [Commented] (FLINK-30863) Register local recovery files of changelog before notifyCheckpointComplete()

2023-03-13 Thread jinghaihang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699920#comment-17699920
 ] 

jinghaihang commented on FLINK-30863:
-

[~Yanfei Lei]  Thanks for your reply.

I don't think the current pr will fix this, and I verified that it does.
The reason is still that the filter operator confirm() method will delete the 
local changelog file of the window operator, because the window operator file 
is not registered at this time.


For this problem, I think of two possible solutions“
Method 1. Change the logic of LocalChangelogRegistryImpl and add a mapping 
relationship between operator and file. When confirm() cleans up, only the 
files of its own operator are cleaned up;
Method 2. In LocalChangelogRegistryImpl#discardUpToCheckpoint(), change 
entry.f1 < upTo to entry.f1 < upTo -1.

Compare the two solutions: method 1 has relatively large changes; method 2 is a 
trick method with small changes, but it is not easy to understand. At the same 
time, there will be one more checkpoint file storage occupation. WDYT?

> Register local recovery files of changelog before notifyCheckpointComplete()
> 
>
> Key: FLINK-30863
> URL: https://issues.apache.org/jira/browse/FLINK-30863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Attachments: tm-log_fail_cl_local_recovery.txt
>
>
> If TM is materialized before receiving confirm(), the previously uploaded 
> queue in `FsStateChangelogWriter` will be cleared, so the local files of the 
> completed checkpoint will not be registered again, while the JM owned files 
> are registered before confirm(), and do not depend on the uploaded queue, so 
> the local files are deleted, and the DFS files are still there. 
>  
> We have encountered the following situation, the job cannot find the local 
> recovery files, but can restore from the DFS files:
> {code:java}
> 2023-01-18 17:21:13,412 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - 
> SlidingProcessingTimeWindows (37/48)#1 #1 (fa12cfa3b811a351e031b036b0e85d91) 
> switched from DEPLOYING to INITIALIZING.
> 2023-01-18 17:21:13,440 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.runtime.state.TaskLocalStateStoreImpl       [] - Found 
> registered local state for checkpoint 11599 in subtask 
> (2daf1d9bc9ed40ecb191303db813b0de - 0a448493b4782967b150582570326227 - 36) : 
> TaskOperatorSubtaskStates{subtaskStatesByOperatorID={0a448493b4782967b150582570326227=SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
>  operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle@38aa46db]},
>  keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=1764644202, 
> checkpointedSize=1997682}}, isTaskDeployedAsFinished=false, 
> isTaskFinished=false}
> 2023-01-18 17:21:13,442 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Getting managed memory shared cache for RocksDB.
> 2023-01-18 17:21:13,446 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Obtained shared RocksDB cache of size 1438814063 bytes
> 2023-01-18 17:21:13,447 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Starting to restore from state handle: 
> IncrementalLocalKeyedStateHandle{metaDataState=File State: 
> file:/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/0d082666-bd31-4ebe-9977-545c0d9b18a5
>  [1187 bytes]} 
> DirectoryKeyedStateHandle{directoryStateHandle=DirectoryStateHandle{directory=/opt/flink/flink-tmp-dir/tm_job-2daf1d9b-c9ed-40ec-b191-303db813b0de-taskmanager-1-31/localState/aid_45af7e6b612dad10b60554d81323d5f3/jid_2daf1d9bc9ed40ecb191303db813b0de/vtx_0a448493b4782967b150582570326227_sti_36/chk_125/b3e1d20f164d4c5baed291f5d1224183},
>  keyGroupRange=KeyGroupRange{startKeyGroup=96, endKeyGroup=98}} without 
> rescaling.
> 2023-01-18 17:21:13,495 [SlidingProcessingTimeWindows (37/48)#1] INFO  
> 

[GitHub] [flink-table-store] JingsongLi merged pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi merged PR #593:
URL: https://github.com/apache/flink-table-store/pull/593


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31435) Introduce event parser for MySql Debezium JSON format in Table Store

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31435:
---

 Summary: Introduce event parser for MySql Debezium JSON format in 
Table Store
 Key: FLINK-31435
 URL: https://issues.apache.org/jira/browse/FLINK-31435
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


MySQL is widely used among Flink CDC connector users. We should first support 
consuming changes from MySQL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31434) Introduce CDC sink for Table Store

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31434:
---

 Summary: Introduce CDC sink for Table Store
 Key: FLINK-31434
 URL: https://issues.apache.org/jira/browse/FLINK-31434
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


To directly consume changes from Flink CDC connectors, we need a special CDC 
sink for Flink Table Store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22173: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition test

2023-03-13 Thread via GitHub


flinkbot commented on PR #22173:
URL: https://github.com/apache/flink/pull/22173#issuecomment-1467279898

   
   ## CI report:
   
   * 9cff66bb2cf991967633d0b3aa88377224085b9f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31433) Make SchemaChange serializable

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31433:
---

 Summary: Make SchemaChange serializable
 Key: FLINK-31433
 URL: https://issues.apache.org/jira/browse/FLINK-31433
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


To avoid concurrent changes to table schema, CDC sinks for Flink Table Store 
should send all \{{SchemaChange}} to a special process function. This process 
function only has 1 parallelism and it is dedicated for schema changes.

 

To pass \{{SchemaChange}} through network, \{{SchemaChange}} must be 
serializable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] syhily commented on pull request #35: [FLINK-31427][Table] Initial Catalog implementation with a new config model and schema conversion.

2023-03-13 Thread via GitHub


syhily commented on PR #35:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/35#issuecomment-1467278411

   @reswqa Can you help me review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31432) Introduce a special StoreWriteOperator to deal with schema changes

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31432:
---

 Summary: Introduce a special StoreWriteOperator to deal with 
schema changes
 Key: FLINK-31432
 URL: https://issues.apache.org/jira/browse/FLINK-31432
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


Currently \{{StoreWriteOperator}} is not able to deal with schema changes. We 
need to introduce a special \{{StoreWriteOperator}} to deal with schema changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22173: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition test

2023-03-13 Thread via GitHub


TanYuxin-tyx opened a new pull request, #22173:
URL: https://github.com/apache/flink/pull/22173

   
   
   ## What is the purpose of the change
   
   *Fix buffer request timeout for sort merge result partition read test*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Fix buffer request timeout for sort merge result partition read test*
   
   ## Verifying this change
   
   This change is a test fixup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31427) Pulsar Catalog support with Schema translation

2023-03-13 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31427:
--

Assignee: Yufan Sheng

> Pulsar Catalog support with Schema translation
> --
>
> Key: FLINK-31427
> URL: https://issues.apache.org/jira/browse/FLINK-31427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.0
>Reporter: Yufan Sheng
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.0.0
>
>
> This task will make the Pulsar serve as the Flink catalog. It will expose the 
> Pulsar's namespace as the Flink's database, the topic as the Flink's table. 
> You can easily create a table and database on Pulsar. The table can be 
> consumed by other clients with a valid schema check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31431) Support copying a FileStoreTable with latest schema

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31431:
---

 Summary: Support copying a FileStoreTable with latest schema
 Key: FLINK-31431
 URL: https://issues.apache.org/jira/browse/FLINK-31431
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


To capture schema changes, CDC sinks of Flink Table Store should be able to use 
the latest schema at any time. This requires us to copy a \{{FileStoreTable}} 
with latest schema so that we can create \{{TableWrite}} with latest schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22172: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition read test

2023-03-13 Thread via GitHub


flinkbot commented on PR #22172:
URL: https://github.com/apache/flink/pull/22172#issuecomment-1467272865

   
   ## CI report:
   
   * 4438179c6b2181fcaec452f16a52a2b9033dc706 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31430) Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31430:
---

 Summary: Support migrating states between different instances of 
TableWriteImpl and AbstractFileStoreWrite
 Key: FLINK-31430
 URL: https://issues.apache.org/jira/browse/FLINK-31430
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


Currently {{Table}} and {{TableWrite}} in Flink Table Store have a fixed 
schema. However to consume schema changes, Flink Table Store CDC sinks should 
have the ability to change its schema during a streaming job.

This require us to pause and store the states of a {{TableWrite}}, then create 
a {{TableWrite}} with newer schema and recover the states in the new 
{{TableWrite}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31429) Support CTAS(create table as) streaming job with schema changes in table store

2023-03-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-31429:
---

 Summary: Support CTAS(create table as) streaming job with schema 
changes in table store
 Key: FLINK-31429
 URL: https://issues.apache.org/jira/browse/FLINK-31429
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Caizhi Weng
Assignee: Caizhi Weng
 Fix For: table-store-0.4.0


Currently CDC connectors for Flink has the ability to stream out records 
changes and schema changes of a database table. Flink Table Store should have 
the ability to directly consume these changes, including schema changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31418:
---
Labels: pull-request-available  (was: )

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22172: [FLINK-31418][network][tests] Fix buffer request timeout for sort merge result partition read test

2023-03-13 Thread via GitHub


TanYuxin-tyx opened a new pull request, #22172:
URL: https://github.com/apache/flink/pull/22172

   
   
   ## What is the purpose of the change
   
   *Fix buffer request timeout for sort merge result partition read test*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Fix buffer request timeout for sort merge result partition read test*
   
   ## Verifying this change
   
   This change is a test fixup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31269) Split hive connector to each module of each version

2023-03-13 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng closed FLINK-31269.
---
  Assignee: Shammon
Resolution: Fixed

master: dd2d600f6743ca074a023fdbb1a7a2cbcfbf8ff0

> Split hive connector to each module of each version
> ---
>
> Key: FLINK-31269
> URL: https://issues.apache.org/jira/browse/FLINK-31269
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Shammon
>Assignee: Shammon
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699913#comment-17699913
 ] 

Yingjie Cao commented on FLINK-31418:
-

It is a test issue, we will fix it soon.

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Critical
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper merged pull request #569: [FLINK-31269] Split hive connector to each module of each version

2023-03-13 Thread via GitHub


tsreaper merged PR #569:
URL: https://github.com/apache/flink-table-store/pull/569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699911#comment-17699911
 ] 

Qingsheng Ren commented on FLINK-31418:
---

Thanks for the feedback [~kevin.cyj]. I'll downgrade this to critical as 
replied.

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Blocker
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31418) SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on CI

2023-03-13 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-31418:
--
Priority: Critical  (was: Blocker)

> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout failed on 
> CI
> ---
>
> Key: FLINK-31418
> URL: https://issues.apache.org/jira/browse/FLINK-31418
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Priority: Critical
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47077=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8756]
> Error message:
> {code:java}
> Mar 13 05:22:10 [ERROR] Failures: 
> Mar 13 05:22:10 [ERROR]   
> SortMergeResultPartitionReadSchedulerTest.testRequestBufferTimeout:278 
> Mar 13 05:22:10 Expecting value to be true but was false{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-13 Thread via GitHub


wuchong commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1134831904


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##
@@ -763,7 +783,7 @@ CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
 tableSchemaProps.putProperties(properties);
 // try to get table schema with both new and old (1.10) key, in 
order to support tables
 // created in old version
-tableSchema =
+TableSchema tableSchema =

Review Comment:
   `CatalogPropertiesUtil` is the alternative of `DescriptorProperties`. It 
should not be a major work to migrate. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-13 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1134809157


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##
@@ -157,12 +165,12 @@ public static Operation convertChangeColumn(
 // disallow changing partition columns
 throw new ValidationException("CHANGE COLUMN cannot be applied to 
partition columns");
 }
-TableSchema oldSchema = catalogTable.getSchema();
+ResolvedSchema oldSchema = catalogTable.getResolvedSchema();

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31428) Add user callbacks to PulsarSource and PulsarSink

2023-03-13 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699901#comment-17699901
 ] 

Yufan Sheng commented on FLINK-31428:
-

I see, the ser/deser issue matters. You want to intercept only on the real 
instance instead of the Pulsar `Message` instance, right?

> Add user callbacks to  PulsarSource and PulsarSink
> --
>
> Key: FLINK-31428
> URL: https://issues.apache.org/jira/browse/FLINK-31428
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Pulsar
>Reporter: Alpha Diallo
>Priority: Major
>
> We'd like to add support for user callbacks in {{PulsarSource}} and 
> {{{}PulsarSink{}}}. This enables specific use cases such as event tracing 
> which requires access to low level message properties such as message IDs 
> after an event is produced, topic partitions, etc...
> The functionality required is similar to {{ConsumerInterceptor}} and 
> {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be 
> made for adding new APIs that would help avoid the extra cost of ser/deser 
> when getting the message body through the {{Message}} interface in the 
> interceptors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-13 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1134803012


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##
@@ -69,67 +69,75 @@ private OperationConverterUtils() {}
 public static Operation convertAddReplaceColumns(

Review Comment:
   Oh, I get it. `SqlAddReplaceColumns` is Hive dialect, and is not used now. 
Will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31413) Remove provided dependency of flink-table-planner from Hive connector

2023-03-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-31413:


Assignee: luoyuxia

> Remove provided dependency of flink-table-planner from Hive connector
> -
>
> Key: FLINK-31413
> URL: https://issues.apache.org/jira/browse/FLINK-31413
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>
> Can move on after finish FLINK-26603. We still need some follow-up tasks 
> before removing provided table-planner dependency in hive connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30659) drop flink-sql-parser-hive

2023-03-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-30659:


Assignee: luoyuxia

> drop flink-sql-parser-hive 
> ---
>
> Key: FLINK-30659
> URL: https://issues.apache.org/jira/browse/FLINK-30659
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Hive Parser should stay with hive connector and maintained together. During 
> runtime, those package should load/unload together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31409) Hive dialect should use public interfaces in Hive connector

2023-03-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-31409:


Assignee: luoyuxia

> Hive dialect should use public interfaces in Hive connector
> ---
>
> Key: FLINK-31409
> URL: https://issues.apache.org/jira/browse/FLINK-31409
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Currently, for the Hive dialect part in Hive connector, it depends much 
> internal interfaces in flink-table-planner or other module. We should avoid 
> it and use public interfaces proposed in  
> [FLIP-216|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-216%3A++Introduce+pluggable+dialect+and+plan+for+migrating+Hive+dialect]]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-17398) Filesystem support flexible path reading

2023-03-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-17398:


Assignee: Hang Ruan  (was: luoyuxia)

> Filesystem support flexible path reading
> 
>
> Key: FLINK-17398
> URL: https://issues.apache.org/jira/browse/FLINK-17398
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Ecosystem
>Reporter: Jingsong Lee
>Assignee: Hang Ruan
>Priority: Major
>
> Like:
>  * Single file reading
>  * wildcard path reading (regex)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] swuferhong commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results

2023-03-13 Thread via GitHub


swuferhong commented on code in PR #22049:
URL: https://github.com/apache/flink/pull/22049#discussion_r1134791918


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java:
##
@@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide(
 }
 }
 
+private boolean isSuitableFilterToPush(RexNode filter, JoinRelType 
joinType) {
+if (filter.isAlwaysTrue()) {
+return false;
+}
+// For left/right outer join, we cannot push down IS_NULL filter to 
other side. Take left
+// outer join as an example, If the join right side contains an 
IS_NULL filter, while we try
+// to push it to the join left side and the left side have any other 
filter on this column,
+// which will conflict and generate wrong plan.
+if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT)

Review Comment:
   Hi, @herunkang2018. For method FlinkFilterJoinRule#pushFiltersToAnotherSide, 
we only support Inner/left/Right join. So there is no need to judge the full 
outer join here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31415) Support target alias and using ddls to create source

2023-03-13 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31415.

Fix Version/s: table-store-0.4.0
 Assignee: yuzelin
   Resolution: Fixed

master: 3fe06f7384bb8ce5fab5997b5d0e81761874f092

> Support target alias and using ddls to create source
> 
>
> Key: FLINK-31415
> URL: https://issues.apache.org/jira/browse/FLINK-31415
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #596: [FLINK-31415] Support target alias and using ddls to create source

2023-03-13 Thread via GitHub


JingsongLi merged PR #596:
URL: https://github.com/apache/flink-table-store/pull/596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31371) Stream failure if the topic doesn't exist

2023-03-13 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699896#comment-17699896
 ] 

Yufan Sheng commented on FLINK-31371:
-

Can we change this JIRA into a feature request with a new title: "Support 
subscribing non-existed topics in Pulsar source"?

> Stream failure if the topic doesn't exist
> -
>
> Key: FLINK-31371
> URL: https://issues.apache.org/jira/browse/FLINK-31371
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.3
>Reporter: Enzo Dechaene
>Priority: Major
>
> *Describe the bug*
> With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks 
> will fail at startup if the topic doesn't exist.
>  
> *To Reproduce*
> Create a stream with :
>  * Flink 1.15.2
>  * Pulsar 2.8.4
>  * with a Pulsar source or sink linked to a non existant topic
>  * Start the stream
>  
> *Expected behavior*
> If the topic doesn't exist, it should be created at the first connection of 
> the source or sink without error.
>  
> *Additional context*
> In the TopicListSubscriber class of the connector, the method 
> getSubscribedTopicPartitions() try to get the metadata of a topic by doing 
> that :
>  
> {code:java}
> TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code}
>  
> If the topic doesn't exist, I get a NullPointerException on the metadata
> We created a previous 
> [ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the 
> Pulsar connector and it was fixed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] herunkang2018 commented on a diff in pull request #22049: [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly pushed down and get wrong join results

2023-03-13 Thread via GitHub


herunkang2018 commented on code in PR #22049:
URL: https://github.com/apache/flink/pull/22049#discussion_r1134786876


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java:
##
@@ -386,6 +387,21 @@ private void pushFiltersToAnotherSide(
 }
 }
 
+private boolean isSuitableFilterToPush(RexNode filter, JoinRelType 
joinType) {
+if (filter.isAlwaysTrue()) {
+return false;
+}
+// For left/right outer join, we cannot push down IS_NULL filter to 
other side. Take left
+// outer join as an example, If the join right side contains an 
IS_NULL filter, while we try
+// to push it to the join left side and the left side have any other 
filter on this column,
+// which will conflict and generate wrong plan.
+if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT)

Review Comment:
   if we need to check full outer join here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134786122


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java:
##
@@ -172,10 +133,30 @@ public MergeFunction create(@Nullable int[][] 
projection) {
 fieldTypes = project.project(tableTypes);
 }
 
-return new AggregateMergeFunction(
-createFieldGetters(fieldTypes),
-new AggregateMergeFunction.RowAggregator(
-conf, fieldNames, fieldTypes, primaryKeys));
+FieldAggregator[] fieldAggregators = new 
FieldAggregator[fieldNames.size()];
+for (int i = 0; i < fieldNames.size(); i++) {
+String fieldName = fieldNames.get(i);
+DataType fieldType = fieldTypes.get(i);
+// aggregate by primary keys, so they do not aggregate
+boolean isPrimaryKey = primaryKeys.contains(fieldName);
+String strAggFunc =
+conf.get(
+key(FIELDS + "." + fieldName + "." + 
AGG_FUNCTION)
+.stringType()
+.noDefaultValue()
+.withDescription(
+"Get " + fieldName + "'s 
aggregate function"));
+boolean ignoreRetract =
+conf.get(
+key(FIELDS + "." + fieldName + "." + 
IGNORE_RETRACT)
+.booleanType()
+.defaultValue(false));

Review Comment:
   I delete description of `AGG_FUNCTION`, here the description is useless, it 
can not be see by users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134784531


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class ForceUpLevel0Compaction implements CompactStrategy {
+
+private final UniversalCompaction universalCompaction;
+
+public ForceUpLevel0Compaction(UniversalCompaction universalCompaction) {
+this.universalCompaction = universalCompaction;
+}
+
+@Override
+public Optional pick(int numLevels, List 
runs) {
+Optional pick = universalCompaction.pick(numLevels, runs);
+if (pick.isPresent()) {
+return pick;
+}
+
+if (runs.isEmpty() || runs.get(0).level() > 0) {
+return Optional.empty();
+}
+
+// collect all level 0 files
+int candidateCount = 1;

Review Comment:
   I think we can finish this in one loop:
   ```
   // collect all level 0 files
   int candidateCount = 0;
   for (int i = candidateCount; i < runs.size(); i++) {
   if (runs.get(i).level() > 0) {
   break;
   }
   candidateCount++;
   }
   
   return candidateCount == 0 ?
   Optional.empty() :
   Optional.of(universal.pickForSizeRatio(numLevels - 1, runs, 
candidateCount, true));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777880


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134777430


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


JingsongLi commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134775901


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink] liujiawinds commented on pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2023-03-13 Thread via GitHub


liujiawinds commented on PR #19844:
URL: https://github.com/apache/flink/pull/19844#issuecomment-1467197335

   @pgaref Feel free to take over this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on pull request #224: univariatefeatureselector.md:101:1": failed to extract shortcode: sho…

2023-03-13 Thread via GitHub


lindong28 commented on PR #224:
URL: https://github.com/apache/flink-ml/pull/224#issuecomment-1467184153

   @alberta0714 Thanks for creating this PR. Could you explain how to reproduce 
the error "failed to extract shortcode"? We can add a test to catch issues like 
this in the future if we can reproduce this error.
   
   Note that the latest website can correctly show the doc for "Univariate 
Feature Selector" [1]. Do you know what the impact of this issue is?
   
   [1] 
https://nightlies.apache.org/flink/flink-ml-docs-master/docs/operators/feature/univariatefeatureselector/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134714232


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 LinkedIn Corp. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package org.apache.flink.table.store.utils;
+
+/** A simple read buffer provide {@code readUnsignedByte} and position. */

Review Comment:
   ```suggestion
   /** A simple read buffer provides {@code readUnsignedByte} and position. */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134713621


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134711088


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134708812


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java:
##
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+// load factor of hash map, default 0.75
+private final double loadFactor;
+// Output
+private final File tempFolder;
+private final OutputStream outputStream;
+// Index stream
+private File[] indexFiles;
+private DataOutputStream[] indexStreams;
+// Data stream
+private File[] dataFiles;
+private DataOutputStream[] dataStreams;
+// Cache last value
+private byte[][] lastValues;
+private int[] lastValuesLength;
+// Data length
+private long[] dataLengths;
+// Index length
+private long indexesLength;
+// Max offset length
+private int[] maxOffsetLengths;
+// Number of keys
+private int keyCount;
+private int[] keyCounts;
+// Number of values
+private int valueCount;
+// Number of collisions
+private int collisions;
+
+HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+this.loadFactor = loadFactor;
+if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+throw new IllegalArgumentException(
+"Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+}
+
+tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+if (!tempFolder.mkdir()) {
+throw new IOException("Can not create temp folder: " + tempFolder);
+}
+outputStream = new BufferedOutputStream(new FileOutputStream(file));
+indexStreams = new DataOutputStream[0];
+dataStreams = new DataOutputStream[0];
+indexFiles = new File[0];
+dataFiles = new File[0];
+lastValues = new byte[0][];
+lastValuesLength = new int[0];
+dataLengths = new long[0];
+maxOffsetLengths = new int[0];
+keyCounts = new int[0];
+}
+
+@Override
+public void put(byte[] key, byte[] value) throws IOException {
+int keyLength = key.length;
+
+// Get the Output stream for that keyLength, each key length has its 
own file
+DataOutputStream indexStream = getIndexStream(keyLength);
+
+// Write key
+indexStream.write(key);
+
+// Check if the value is identical to the last inserted
+byte[] lastValue = lastValues[keyLength];
+boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+// Get data stream and length
+long dataLength = dataLengths[keyLength];
+if (sameValue) {
+dataLength -= 

[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134702880


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Reader, lookup value by key bytes. */
+public interface LookupStoreReader extends Closeable {
+
+byte[] lookup(byte[] key) throws IOException;

Review Comment:
   Adds the java doc for interface methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #593: [FLINK-31397] Introduce write-once hash lookup store

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #593:
URL: https://github.com/apache/flink-table-store/pull/593#discussion_r1134703041


##
flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Writer to prepare binary file. */
+public interface LookupStoreWriter extends Closeable {
+
+void put(byte[] key, byte[] value) throws IOException;

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134698287


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java:
##
@@ -172,10 +133,30 @@ public MergeFunction create(@Nullable int[][] 
projection) {
 fieldTypes = project.project(tableTypes);
 }
 
-return new AggregateMergeFunction(
-createFieldGetters(fieldTypes),
-new AggregateMergeFunction.RowAggregator(
-conf, fieldNames, fieldTypes, primaryKeys));
+FieldAggregator[] fieldAggregators = new 
FieldAggregator[fieldNames.size()];
+for (int i = 0; i < fieldNames.size(); i++) {
+String fieldName = fieldNames.get(i);
+DataType fieldType = fieldTypes.get(i);
+// aggregate by primary keys, so they do not aggregate
+boolean isPrimaryKey = primaryKeys.contains(fieldName);
+String strAggFunc =
+conf.get(
+key(FIELDS + "." + fieldName + "." + 
AGG_FUNCTION)
+.stringType()
+.noDefaultValue()
+.withDescription(
+"Get " + fieldName + "'s 
aggregate function"));
+boolean ignoreRetract =
+conf.get(
+key(FIELDS + "." + fieldName + "." + 
IGNORE_RETRACT)
+.booleanType()
+.defaultValue(false));

Review Comment:
   Adds the description?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134693522


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * A {@link MergeFunction} for lookup, this wrapper only consider the latest 
high level record,

Review Comment:
   ```suggestion
* A {@link MergeFunction} for lookup, this wrapper only considers the 
latest high level record,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134691216


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.types.RowKind;
+
+import java.util.function.Function;
+
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
+
+/**
+ * Wrapper for {@link MergeFunction}s to produce changelog by lookup during 
the compaction involving
+ * level 0 files.
+ *
+ * Changelog records are generated in the process of the level-0 file 
participating in the
+ * compaction, if during the compaction processing:
+ *
+ * 
+ *   Without level-0 records, no changelog.
+ *   With level-0 record, with level-x (x > 0) record, level-x record 
should be BEFORE, level-0
+ *   should be AFTER.
+ *   With level-0 record, without level-x record, need to lookup the 
history value of the upper
+ *   level as BEFORE.
+ * 
+ */
+public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper {
+
+private final LookupMergeFunction mergeFunction;
+private final MergeFunction mergeFunction2;
+private final Function lookup;
+
+private final ChangelogResult reusedResult = new ChangelogResult();
+private final KeyValue reusedBefore = new KeyValue();
+private final KeyValue reusedAfter = new KeyValue();
+
+public LookupChangelogMergeFunctionWrapper(
+MergeFunctionFactory mergeFunctionFactory,
+Function lookup) {
+MergeFunction mergeFunction = mergeFunctionFactory.create();
+checkArgument(
+mergeFunction instanceof LookupMergeFunction,
+"Merge function should be a  LookupMergeFunction, but is %s, 
there is a bug.",

Review Comment:
   ```suggestion
   "Merge function should be a LookupMergeFunction, but is %s, 
there is a bug.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134689986


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class ForceUpLevel0Compaction implements CompactStrategy {
+
+private final UniversalCompaction universalCompaction;
+
+public ForceUpLevel0Compaction(UniversalCompaction universalCompaction) {
+this.universalCompaction = universalCompaction;
+}
+
+@Override
+public Optional pick(int numLevels, List 
runs) {
+Optional pick = universalCompaction.pick(numLevels, runs);
+if (pick.isPresent()) {
+return pick;
+}
+
+if (runs.isEmpty() || runs.get(0).level() > 0) {
+return Optional.empty();
+}
+
+// collect all level 0 files
+int candidateCount = 1;

Review Comment:
   Why not `int candidateCount = runs.stream().filter(r -> r.level() > 
0).count();`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #594: [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction

2023-03-13 Thread via GitHub


SteNicholas commented on code in PR #594:
URL: https://github.com/apache/flink-table-store/pull/594#discussion_r1134689986


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class ForceUpLevel0Compaction implements CompactStrategy {
+
+private final UniversalCompaction universalCompaction;
+
+public ForceUpLevel0Compaction(UniversalCompaction universalCompaction) {
+this.universalCompaction = universalCompaction;
+}
+
+@Override
+public Optional pick(int numLevels, List 
runs) {
+Optional pick = universalCompaction.pick(numLevels, runs);
+if (pick.isPresent()) {
+return pick;
+}
+
+if (runs.isEmpty() || runs.get(0).level() > 0) {
+return Optional.empty();
+}
+
+// collect all level 0 files
+int candidateCount = 1;

Review Comment:
   Why not `int candidateCount = runs.stream().filter(r->r.level()>0).count();`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31369) Harden modifiers for sql-gateway module

2023-03-13 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699863#comment-17699863
 ] 

Sergey Nuyanzin commented on FLINK-31369:
-

Merged at: 
[20dc237f1b6eb32ef5344b5ece0a1e3a008e8bfd|https://github.com/apache/flink/commit/20dc237f1b6eb32ef5344b5ece0a1e3a008e8bfd]

> Harden modifiers for sql-gateway module
> ---
>
> Key: FLINK-31369
> URL: https://issues.apache.org/jira/browse/FLINK-31369
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway, Tests
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is a follow up jira issue for 
> https://github.com/apache/flink/pull/22127#discussion_r1129192778



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31369) Harden modifiers for sql-gateway module

2023-03-13 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-31369.
-
Fix Version/s: 1.18.0
   Resolution: Fixed

> Harden modifiers for sql-gateway module
> ---
>
> Key: FLINK-31369
> URL: https://issues.apache.org/jira/browse/FLINK-31369
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway, Tests
>Reporter: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is a follow up jira issue for 
> https://github.com/apache/flink/pull/22127#discussion_r1129192778



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin merged pull request #22135: [FLINK-31369][sql-gateway] Harden modifiers in tests for sql-gateway module

2023-03-13 Thread via GitHub


snuyanzin merged PR #22135:
URL: https://github.com/apache/flink/pull/22135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31369) Harden modifiers for sql-gateway module

2023-03-13 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-31369:
---

Assignee: Sergey Nuyanzin

> Harden modifiers for sql-gateway module
> ---
>
> Key: FLINK-31369
> URL: https://issues.apache.org/jira/browse/FLINK-31369
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway, Tests
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is a follow up jira issue for 
> https://github.com/apache/flink/pull/22127#discussion_r1129192778



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #548: [FLINK-31407] Bump fabric8 version to 6.5.0

2023-03-13 Thread via GitHub


mbalassi commented on code in PR #548:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/548#discussion_r1134642722


##
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java:
##
@@ -57,14 +58,14 @@ public void stopAndCleanupCluster(String clusterId) {
 .apps()
 .deployments()
 
.withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
-.cascading(true)
+.withPropagationPolicy(DeletionPropagation.ORPHAN)

Review Comment:
   @usamj could you please confirm that this was the intention here.



##
flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java:
##
@@ -57,14 +58,14 @@ public void stopAndCleanupCluster(String clusterId) {
 .apps()
 .deployments()
 
.withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
-.cascading(true)
+.withPropagationPolicy(DeletionPropagation.ORPHAN)

Review Comment:
   @usamj could you please confirm that this was the intention here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31407) Upgrade Fabric8 version to 6.5.0

2023-03-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31407:
---
Labels: pull-request-available  (was: )

> Upgrade Fabric8 version to 6.5.0
> 
>
> Key: FLINK-31407
> URL: https://issues.apache.org/jira/browse/FLINK-31407
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Márton Balassi
>Priority: Major
>  Labels: pull-request-available
>
> Fabric8 6.5.0 has been released recently with a number of major improvements:
> [https://github.com/fabric8io/kubernetes-client/releases/tag/v6.5.0]
> This is a very important version for the operator as it also fixes some 
> outstanding issues with timing out informers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] mbalassi opened a new pull request, #548: [FLINK-31407] Bump fabric8 version to 6.5.0

2023-03-13 Thread via GitHub


mbalassi opened a new pull request, #548:
URL: https://github.com/apache/flink-kubernetes-operator/pull/548

   This new version brings critical stability fixes for the informers. I fixed 
the newly introduced deprecation warnings.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on pull request #19844: [FLINK-27805][Connectors/ORC] bump orc version to 1.7.5

2023-03-13 Thread via GitHub


pgaref commented on PR #19844:
URL: https://github.com/apache/flink/pull/19844#issuecomment-1466980765

   @liujiawinds are you still working on this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-13 Thread via GitHub


snuyanzin commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1134628496


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -178,6 +181,137 @@ Stream getTestSetSpecs() {
 null
 },
 DataTypes.ARRAY(
-DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE();
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE(,
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE)
+.onFieldsWithData(
+new Integer[] {1, 2, 2},
+null,
+new String[] {"Hello", "World"},
+new Row[] {
+Row.of(true, LocalDate.of(2022, 4, 20)),
+Row.of(true, LocalDate.of(1990, 10, 14)),
+null
+},
+new Integer[] {null, null, 1},
+new Integer[][] {
+new Integer[] {1, null, 3}, new Integer[] 
{0}, new Integer[] {1}
+},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d")),
+null
+})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()).notNull(),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+DataTypes.ARRAY(DataTypes.INT()),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING(
+// ARRAY
+.testResult(
+$("f0").arrayRemove(2),
+"ARRAY_REMOVE(f0, 2)",
+new Integer[] {1},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f0").arrayRemove(42),
+"ARRAY_REMOVE(f0, 42)",
+new Integer[] {1, 2, 2},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f0").arrayRemove(
+lit(null, DataTypes.SMALLINT())
+
.cast(DataTypes.INT())),
+"ARRAY_REMOVE(f0, cast(NULL AS INT))",
+new Integer[] {1, 2, 2},
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+// ARRAY of null value
+.testResult(
+$("f1").arrayRemove(12),
+"ARRAY_REMOVE(f1, 12)",
+null,
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+.testResult(
+$("f1").arrayRemove(null),
+"ARRAY_REMOVE(f1, NULL)",
+null,
+DataTypes.ARRAY(DataTypes.INT()).nullable())
+// ARRAY NOT NULL
+.testResult(
+$("f2").arrayRemove("Hello"),
+"ARRAY_REMOVE(f2, 'Hello')",
+new String[] {"World"},
+DataTypes.ARRAY(DataTypes.STRING()).notNull())
+.testResult(
+$("f2").arrayRemove(
+lit(null, DataTypes.STRING())
+
.cast(DataTypes.STRING())),
+"ARRAY_REMOVE(f2, cast(NULL AS VARCHAR))",
+new String[] {"Hello", "World"},
+DataTypes.ARRAY(DataTypes.STRING()).notNull())
+// ARRAY>
+.testResult(
+

[jira] [Updated] (FLINK-31428) Add user callbacks to PulsarSource and PulsarSink

2023-03-13 Thread Alpha Diallo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alpha Diallo updated FLINK-31428:
-
Description: 
We'd like to add support for user callbacks in {{PulsarSource}} and 
{{{}PulsarSink{}}}. This enables specific use cases such as event tracing which 
requires access to low level message properties such as message IDs after an 
event is produced, topic partitions, etc...

The functionality required is similar to {{ConsumerInterceptor}} and 
{{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be 
made for adding new APIs that would help avoid the extra cost of ser/deser when 
getting the message body through the {{Message}} interface in the interceptors.

  was:
We'd like to add support for user callbacks in PulsarSource and 
{{{}{{PulsarSink}}{}}}. This enables specific use cases such as event tracing 
which requires access to low level message properties such as message IDs after 
an event is produced, topic partitions, etc...

The functionality required is similar to {{ConsumerInterceptor}} and 
{{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be 
made for adding new APIs that would help avoid the extra cost of ser/deser when 
getting the message body through the {{Message}} interface in the interceptors.


> Add user callbacks to  PulsarSource and PulsarSink
> --
>
> Key: FLINK-31428
> URL: https://issues.apache.org/jira/browse/FLINK-31428
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Pulsar
>Reporter: Alpha Diallo
>Priority: Major
>
> We'd like to add support for user callbacks in {{PulsarSource}} and 
> {{{}PulsarSink{}}}. This enables specific use cases such as event tracing 
> which requires access to low level message properties such as message IDs 
> after an event is produced, topic partitions, etc...
> The functionality required is similar to {{ConsumerInterceptor}} and 
> {{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be 
> made for adding new APIs that would help avoid the extra cost of ser/deser 
> when getting the message body through the {{Message}} interface in the 
> interceptors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31428) Add user callbacks to PulsarSource and PulsarSink

2023-03-13 Thread Alpha Diallo (Jira)
Alpha Diallo created FLINK-31428:


 Summary: Add user callbacks to  PulsarSource and PulsarSink
 Key: FLINK-31428
 URL: https://issues.apache.org/jira/browse/FLINK-31428
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Pulsar
Reporter: Alpha Diallo


We'd like to add support for user callbacks in PulsarSource and 
{{{}{{PulsarSink}}{}}}. This enables specific use cases such as event tracing 
which requires access to low level message properties such as message IDs after 
an event is produced, topic partitions, etc...

The functionality required is similar to {{ConsumerInterceptor}} and 
{{ProducerInterceptor}} in the Pulsar Client. However, there is a case to be 
made for adding new APIs that would help avoid the extra cost of ser/deser when 
getting the message body through the {{Message}} interface in the interceptors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-13 Thread via GitHub


snuyanzin commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1134622026


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayRemoveFunction.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_REMOVE}. */
+@Internal
+public class ArrayRemoveFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator equalityEvaluator;
+private transient MethodHandle equalityHandle;
+
+public ArrayRemoveFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_REMOVE, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+final DataType needleDataType = 
context.getCallContext().getArgumentDataTypes().get(1);
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+equalityEvaluator =
+context.createEvaluator(
+$("element").isEqual($("needle")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("needle", 
needleDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityHandle = equalityEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData haystack, Object needle) {
+try {
+if (haystack == null) {
+return null;
+}
+
+List list = new ArrayList();
+final int size = haystack.size();
+for (int pos = 0; pos < size; pos++) {
+final Object element = 
elementGetter.getElementOrNull(haystack, pos);
+if ((element == null && needle != null)
+|| (element != null && needle == null)
+|| (element != null
+&& needle != null
+&& !(boolean) equalityHandle.invoke(element, 
needle))) {

Review Comment:
   This could be simplified e.g. check for the negative case and continue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #22153: [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures.

2023-03-13 Thread via GitHub


pgaref commented on code in PR #22153:
URL: https://github.com/apache/flink/pull/22153#discussion_r1134608280


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Information about the parallelism of job vertices. */
+public class JobResourceRequirements implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/**
+ * A key for an internal config option (intentionally prefixed with 
$internal to make this
+ * explicit), that we'll serialize the {@link JobResourceRequirements} 
into, when writing it to
+ * {@link JobGraph}.
+ */
+private static final String JOB_RESOURCE_REQUIREMENTS_KEY =
+"$internal.job-resource-requirements";
+
+private static final JobResourceRequirements EMPTY =
+new JobResourceRequirements(Collections.emptyMap());
+
+/**
+ * Write {@link JobResourceRequirements resource requirements} into the 
configuration of a given
+ * {@link JobGraph}.
+ *
+ * @param jobGraph job graph to write requirements to
+ * @param jobResourceRequirements resource requirements to write
+ * @throws IOException in case we're not able to serialize requirements 
into the configuration
+ */
+public static void writeToJobGraph(
+JobGraph jobGraph, JobResourceRequirements 
jobResourceRequirements) throws IOException {
+InstantiationUtil.writeObjectToConfig(
+jobResourceRequirements,
+jobGraph.getJobConfiguration(),
+JOB_RESOURCE_REQUIREMENTS_KEY);
+}
+
+/**
+ * Read {@link JobResourceRequirements resource requirements} from the 
configuration of a given
+ * {@link JobGraph}.
+ *
+ * @param jobGraph job graph to read requirements from
+ * @throws IOException in case we're not able to deserialize requirements 
from the configuration
+ * @throws ClassNotFoundException in case some deserialized classes are 
missing on the classpath
+ */
+public static Optional readFromJobGraph(JobGraph 
jobGraph)
+throws IOException, ClassNotFoundException {
+return Optional.ofNullable(
+InstantiationUtil.readObjectFromConfig(
+jobGraph.getJobConfiguration(),
+JOB_RESOURCE_REQUIREMENTS_KEY,
+JobResourceRequirements.class.getClassLoader()));
+}
+
+/**
+ * This method validates that the new job vertex parallelisms are less or 
equal to the max
+ * parallelism. Moreover, it validates that there are no unknown job 
vertex ids and that we're
+ * not missing any.

Review Comment:
   We could also describe the special `-1` case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #22153: [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures.

2023-03-13 Thread via GitHub


pgaref commented on code in PR #22153:
URL: https://github.com/apache/flink/pull/22153#discussion_r1134606959


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobResourceRequirements.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Information about the parallelism of job vertices. */
+public class JobResourceRequirements implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/**
+ * A key for an internal config option (intentionally prefixed with 
$internal to make this
+ * explicit), that we'll serialize the {@link JobResourceRequirements} 
into, when writing it to
+ * {@link JobGraph}.
+ */
+private static final String JOB_RESOURCE_REQUIREMENTS_KEY =
+"$internal.job-resource-requirements";
+
+private static final JobResourceRequirements EMPTY =
+new JobResourceRequirements(Collections.emptyMap());
+
+/**
+ * Write {@link JobResourceRequirements resource requirements} into the 
configuration of a given
+ * {@link JobGraph}.
+ *
+ * @param jobGraph job graph to write requirements to
+ * @param jobResourceRequirements resource requirements to write
+ * @throws IOException in case we're not able to serialize requirements 
into the configuration
+ */
+public static void writeToJobGraph(
+JobGraph jobGraph, JobResourceRequirements 
jobResourceRequirements) throws IOException {
+InstantiationUtil.writeObjectToConfig(
+jobResourceRequirements,
+jobGraph.getJobConfiguration(),
+JOB_RESOURCE_REQUIREMENTS_KEY);
+}
+
+/**
+ * Read {@link JobResourceRequirements resource requirements} from the 
configuration of a given
+ * {@link JobGraph}.
+ *
+ * @param jobGraph job graph to read requirements from
+ * @throws IOException in case we're not able to deserialize requirements 
from the configuration
+ * @throws ClassNotFoundException in case some deserialized classes are 
missing on the classpath
+ */
+public static Optional readFromJobGraph(JobGraph 
jobGraph)
+throws IOException, ClassNotFoundException {

Review Comment:
   I guess the only way to throw a ClassNotFoundException here is for 
JobResourceRequirements (or inner ones) class to be missing ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #22153: [FLINK-31317] Introduce JobResourceRequirements and JobVertexResourceRequirements data structures.

2023-03-13 Thread via GitHub


pgaref commented on code in PR #22153:
URL: https://github.com/apache/flink/pull/22153#discussion_r1134587834


##
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java:
##
@@ -466,6 +468,7 @@ public static String checkForInstantiationError(Class 
clazz) {
 }
 }
 
+@Nullable

Review Comment:
   Minor but would it make sense to introduce the Runtime annotation in a 
separate ticket as it is used in a bunch of places? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31371) Stream failure if the topic doesn't exist

2023-03-13 Thread Enzo Dechaene (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699824#comment-17699824
 ] 

Enzo Dechaene commented on FLINK-31371:
---

Hi [~syhily], yes, it makes sense to do it that way because by default the 
pulsar broker parameter *allowAutoTopicCreation* is set to true.

Thanks for the answer

> Stream failure if the topic doesn't exist
> -
>
> Key: FLINK-31371
> URL: https://issues.apache.org/jira/browse/FLINK-31371
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.3
>Reporter: Enzo Dechaene
>Priority: Major
>
> *Describe the bug*
> With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks 
> will fail at startup if the topic doesn't exist.
>  
> *To Reproduce*
> Create a stream with :
>  * Flink 1.15.2
>  * Pulsar 2.8.4
>  * with a Pulsar source or sink linked to a non existant topic
>  * Start the stream
>  
> *Expected behavior*
> If the topic doesn't exist, it should be created at the first connection of 
> the source or sink without error.
>  
> *Additional context*
> In the TopicListSubscriber class of the connector, the method 
> getSubscribedTopicPartitions() try to get the metadata of a topic by doing 
> that :
>  
> {code:java}
> TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code}
>  
> If the topic doesn't exist, I get a NullPointerException on the metadata
> We created a previous 
> [ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the 
> Pulsar connector and it was fixed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31427) Pulsar Catalog support with Schema translation

2023-03-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31427:
---
Labels: pull-request-available  (was: )

> Pulsar Catalog support with Schema translation
> --
>
> Key: FLINK-31427
> URL: https://issues.apache.org/jira/browse/FLINK-31427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.0.0
>Reporter: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.0.0
>
>
> This task will make the Pulsar serve as the Flink catalog. It will expose the 
> Pulsar's namespace as the Flink's database, the topic as the Flink's table. 
> You can easily create a table and database on Pulsar. The table can be 
> consumed by other clients with a valid schema check.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] syhily opened a new pull request, #35: [FLINK-31427][Table] Initial Catalog implementation with a new config model and schema conversion.

2023-03-13 Thread via GitHub


syhily opened a new pull request, #35:
URL: https://github.com/apache/flink-connector-pulsar/pull/35

   ## Purpose of the change
   
   This PR will make the Pulsar serve as the Flink catalog backend. It will 
expose the Pulsar's namespace as the Flink's database, the Pulsar's topic as 
the Flink's table. You can easily create any tables and databases on Pulsar. 
The tables created by Flink can be consumed by other Pulsar clients with a 
valid schema check.
   
   ## Brief change log
   
   - Support both `Duration` and numeric time in `PulsarConfiguration` class, 
which make the existing config options can be configured with a time str in 
Table API.
   - Alter all the existing time related config options to use `useDuration` 
method.
   - Initial `PulsarCatalog` implementation with new config model and schema 
translator and etc.
   
   ## Verifying this change
   
   This change doesn't contains any tests, it's a initial implementation which 
won't be used by any user. We will add tests in upcoming PRs.
   
   ## Significant changes
   
   - [x] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [x] New feature has been introduced
   - If yes, how is this documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31414) exceptions in the alignment timer are ignored

2023-03-13 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699776#comment-17699776
 ] 

Piotr Nowojski commented on FLINK-31414:


[~Feifan Wang], can you elaborate a bit more what's the problem? I see a couple 
of discrepancies in your description and the stack trace that you posted:
*  the stack trace doesn't match to the master code, so I'm not sure what Flink 
version you are using?
* doesn't the error message "switched from RUNNING to FAILED" refer to actually 
subtask/task switching to FAILED state, contradicting your statement that the 
exception is being ignored?
* in the PR, I don't see a test coverage - a working unit test/ITCase that used 
to be failing without your fix would be nice to have. Both for explaining what 
is the issue and for actually providing regression test coverage

> exceptions in the alignment timer are ignored
> -
>
> Key: FLINK-31414
> URL: https://issues.apache.org/jira/browse/FLINK-31414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
>
> Alignment timer task in alternating aligned checkpoint run as a future task 
> in mailbox thread, causing the exceptions 
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
>  to be ignored. These exceptions should have failed the task, but now this 
> will cause the same checkpoint to fire twice initInputsCheckpoints in my test.
>  
> {code:java}
>  switched from RUNNING to FAILED with failure cause: 
> java.lang.RuntimeException: unable to send request to worker
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
>         at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
>         at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
>         at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>         at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>      

  1   2   3   >