[jira] [Commented] (FLINK-32239) Unify TestJvmProcess and TestProcessBuilder

2023-06-08 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730817#comment-17730817
 ] 

Samrat Deb commented on FLINK-32239:


Hi [~chesnay] , 

Can i pick this minor improvement ? 

Thank you 

> Unify TestJvmProcess and TestProcessBuilder
> ---
>
> Key: FLINK-32239
> URL: https://issues.apache.org/jira/browse/FLINK-32239
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Test Infrastructure
>Reporter: Chesnay Schepler
>Priority: Minor
>  Labels: starter
> Fix For: 1.18.0
>
>
> Both of these utility classes are used to spawn additional JVM processes 
> during tests, and contain a fair bit of duplicated logic. We can unify them 
> to ease maintenance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-08 Thread via GitHub


pvary commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1223870641


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Optional;
+
+/** Delegation token provider for Hive. */
+@Internal
+public class HiveDelegationTokenProvider implements DelegationTokenProvider {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HiveDelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hiveConf;
+
+private KerberosLoginProvider kerberosLoginProvider;
+
+private static final Text TOKEN_ALIAS = new 
Text("hive.server2.delegation.token");
+
+@Override
+public String serviceName() {
+return "HiveServer2";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hiveConf = getHiveConfiguration(configuration);
+kerberosLoginProvider = new KerberosLoginProvider(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHiveConfiguration(Configuration conf) {
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+} catch (Exception | NoClassDefFoundError e) {

Review Comment:
   Ohh... I see. The answer is yes based on the comment below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-08 Thread via GitHub


pvary commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1223870641


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Optional;
+
+/** Delegation token provider for Hive. */
+@Internal
+public class HiveDelegationTokenProvider implements DelegationTokenProvider {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HiveDelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hiveConf;
+
+private KerberosLoginProvider kerberosLoginProvider;
+
+private static final Text TOKEN_ALIAS = new 
Text("hive.server2.delegation.token");
+
+@Override
+public String serviceName() {
+return "HiveServer2";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hiveConf = getHiveConfiguration(configuration);
+kerberosLoginProvider = new KerberosLoginProvider(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHiveConfiguration(Configuration conf) {
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+} catch (Exception | NoClassDefFoundError e) {

Review Comment:
   Ohh... I see



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-08 Thread via GitHub


pvary commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1223870388


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Optional;
+
+/** Delegation token provider for Hive. */
+@Internal
+public class HiveDelegationTokenProvider implements DelegationTokenProvider {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HiveDelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hiveConf;
+
+private KerberosLoginProvider kerberosLoginProvider;
+
+private static final Text TOKEN_ALIAS = new 
Text("hive.server2.delegation.token");
+
+@Override
+public String serviceName() {
+return "HiveServer2";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hiveConf = getHiveConfiguration(configuration);
+kerberosLoginProvider = new KerberosLoginProvider(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHiveConfiguration(Configuration conf) {
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+} catch (Exception | NoClassDefFoundError e) {

Review Comment:
   Question: If we put the provider to the Hive module, is it still possible to 
miss Hive from the classpath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-08 Thread via GitHub


pvary commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1223869035


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Optional;
+
+/** Delegation token provider for Hive. */

Review Comment:
   Nit: maybe we could add hiveserver2 to the comment as well 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223868769


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+private static final int INPUT_CHANNEL_INDEX = 0;
+
+@Test
+void testReadBufferOfNonPriorityDataType() {
+int bufferNumber = 1;
+CompletableFuture> 
availableAndPriorityConsumer =
+new CompletableFuture<>();
+CompletableFuture> 
prioritySequenceNumberConsumer =
+new CompletableFuture<>();
+CompletableFuture requiredSegmentIdFuture = new 
CompletableFuture<>();
+Supplier inputChannelSupplier =
+createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+NettyConnectionReader reader =
+createNettyConnectionReader(
+inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+Optional buffer = reader.readBuffer(0);
+assertThat(buffer.isPresent()).isTrue();
+assertThat(buffer.get().isBuffer()).isTrue();
+assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+}
+
+@Test
+void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+int bufferNumber = 2;
+CompletableFuture> 
availableAndPriorityConsumer =
+new CompletableFuture<>();
+CompletableFuture> 
prioritySequenceNumberConsumer =
+new CompletableFuture<>();
+CompletableFuture requiredSegmentIdFuture = new 
CompletableFuture<>();
+Supplier inputChannelSupplier =
+createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+NettyConnectionReader reader =
+createNettyConnectionReader(
+inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+Optional buffer = reader.readBuffer(0);
+assertThat(buffer.isPresent()).isTrue();
+assertThat(buffer.get().isBuffer()).isFalse();
+assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+Tuple2 result1 = availableAndPriorityConsumer.get();
+assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+

[GitHub] [flink] flinkbot commented on pull request #22744: [FLINK-29802][state] Changelog supports native savepoint

2023-06-08 Thread via GitHub


flinkbot commented on PR #22744:
URL: https://github.com/apache/flink/pull/22744#issuecomment-1583990457

   
   ## CI report:
   
   * e9987683f0c11781cb1d446132d79c526e7cc0c1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22743: [FLINK-26515][test] Add exception handling for RetryingExecutorTest#testDiscardOnTimeout

2023-06-08 Thread via GitHub


flinkbot commented on PR #22743:
URL: https://github.com/apache/flink/pull/22743#issuecomment-1583986101

   
   ## CI report:
   
   * 5a516e3f557f00b4ae30e04aed939942cdd6c599 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29802) ChangelogStateBackend supports native savepoint

2023-06-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29802:
---
Labels: pull-request-available  (was: )

> ChangelogStateBackend supports native savepoint
> ---
>
> Key: FLINK-29802
> URL: https://issues.apache.org/jira/browse/FLINK-29802
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx opened a new pull request, #22744: [FLINK-29802][state] Changelog supports native savepoint

2023-06-08 Thread via GitHub


masteryhx opened a new pull request, #22744:
URL: https://github.com/apache/flink/pull/22744

   
   
   ## What is the purpose of the change
   
   - Supports native savepoint for ChangelogStateBackend
   
   
   ## Brief change log
   
   - Trigger native savepoint of internal state backend manually.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*SavepointFormatITCase*, *ChangelogCompatibilityITCase*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers:  no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure

2023-06-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-26515:
---
Labels: auto-deprioritized-major pull-request-available test-stability  
(was: auto-deprioritized-major test-stability)

> RetryingExecutorTest. testDiscardOnTimeout failed on azure
> --
>
> Key: FLINK-26515
> URL: https://issues.apache.org/jira/browse/FLINK-26515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0
>Reporter: Yun Gao
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 1.941 s <<< FAILURE! - in 
> org.apache.flink.changelog.fs.RetryingExecutorTest
> Mar 06 01:20:29 [ERROR] testTimeout  Time elapsed: 1.934 s  <<< FAILURE!
> Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but 
> was:<1922.869766>
> Mar 06 01:20:29   at org.junit.Assert.fail(Assert.java:89)
> Mar 06 01:20:29   at org.junit.Assert.failNotEquals(Assert.java:835)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:555)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:685)
> Mar 06 01:20:29   at 
> org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145)
> Mar 06 01:20:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 01:20:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 01:20:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 01:20:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Mar 06 01:20:29   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> Mar 06 01:20:29   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 06 01:20:29   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 06 01:20:29   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Mar 06 01:20:29   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx opened a new pull request, #22743: [FLINK-26515][test] Add exception handling for RetryingExecutorTest#testDiscardOnTimeout

2023-06-08 Thread via GitHub


masteryhx opened a new pull request, #22743:
URL: https://github.com/apache/flink/pull/22743

   
   
   
   ## What is the purpose of the change
   
   - Add exception handling machnism to make the exception visiable when 
RetryingExecutorTest#testDiscardOnTimeout timeouts
   
   
   ## Brief change log
   
   - Catch the exception and assert
   
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223835048


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+private final String id;
+
+protected final Executor ioExecutor;
+
+// file system and directories
+protected FileSystem fs;
+protected Path checkpointDir;
+protected Path sharedStateDir;
+protected Path taskOwnedStateDir;
+
+protected int writeBufferSize;
+private boolean fileSystemInitiated = false;
+
+protected boolean syncAfterClosingLogicalFile;
+
+protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+private final Map managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+protected Path managedExclusiveStateDir;
+
+public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+this.id = id;
+this.ioExecutor = ioExecutor;
+}
+
+@Override
+public void initFileSystem(
+FileSystem fileSystem,
+Path checkpointBaseDir,
+Path sharedStateDir,
+Path taskOwnedStateDir) {
+if (fileSystemInitiated) {
+Preconditions.checkArgument(
+checkpointBaseDir.equals(this.checkpointDir),
+"The checkpoint base dir is not deterministic across 
subtasks.");
+Preconditions.checkArgument(
+sharedStateDir.equals(this.sharedStateDir),
+"The shared checkpoint dir is not deterministic across 
subtasks.");
+Preconditions.checkArgument(
+taskOwnedStateDir.equals(this.taskOwnedStateDir),
+"The task-owned checkpoint dir is not deterministic across 
subtasks.");
+return;
+}
+this.fs = fileSystem;
+this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+this.fileSystemInitiated = true;
+this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+// Initialize the managed exclusive path using id as the child path 
name.
+// Currently, we use the task-owned directory to place the merged 
private state. According
+// to the FLIP-306, we later consider move these files to the new 
introduced
+// task-manager-owned directory.
+Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+createManagedDirectory(managedExclusivePath);
+this.managedExclusiveStateDir = managedExclusivePath;
+}
+
+@Override
+public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+String managedDirName = subtaskKey.getManagedDirName();
+Path managedPath = new Path(sharedStateDir, 

[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223830277


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringBasedID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+/** An abstraction of logical files in file-merging checkpoints. */
+public class LogicalFile {

Review Comment:
   Yes, I added some JavaDocs here. It actually does NOT need to record the 
offset and length (they are record in the corresponding State Handle), and the 
logical files are only used to track the usage of the physical by reference 
counting. However, for semantic integrity, I added the offset and length within 
`LogicalFile`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+private final String id;
+
+protected final Executor ioExecutor;
+
+// file system and directories
+protected FileSystem fs;
+protected Path checkpointDir;
+protected Path sharedStateDir;
+protected Path taskOwnedStateDir;
+
+protected int writeBufferSize;
+private boolean fileSystemInitiated = false;
+
+protected boolean syncAfterClosingLogicalFile;
+
+protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+private final Map managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+protected Path managedExclusiveStateDir;
+
+public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+this.id = id;
+this.ioExecutor = ioExecutor;
+}
+
+@Override
+public void initFileSystem(
+FileSystem fileSystem,
+Path checkpointBaseDir,
+Path 

[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

2023-06-08 Thread via GitHub


luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1223819746


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not 
split files into
+ * smaller "block" units.
+ *
+ * The default instantiation of this enumerator filters files with the 
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ */
+public class NonSplittingRegexEnumerator extends 
NonSplittingRecursiveEnumerator {
+
+/** The custom filter predicate to filter out unwanted files. */
+private final Predicate fileFilter;
+
+private final RegexFileFilter regexFileFilter;

Review Comment:
   Do we really need this in here? I think in here `fileFilter` is enough. We 
can always construnct the `fileFilter` using regex expression.



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##
@@ -269,10 +271,35 @@ private SourceProvider 
createSourceProvider(BulkFormat
 tableOptions
 
.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
 .ifPresent(fileSourceBuilder::monitorContinuously);
+tableOptions
+.getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+.ifPresent(
+s -> {
+String regexPath = 
connectBasePathAndRegex(path.getPath(), s);
+fileSourceBuilder.setFileEnumerator(
+bulkFormat.isSplittable()
+? () -> new 
BlockSplittingRegexEnumerator(regexPath)
+: () -> new 
NonSplittingRegexEnumerator(regexPath));
+});
 
 return SourceProvider.of(fileSourceBuilder.build());
 }
 
+private String connectBasePathAndRegex(String basePath, String regex) {
+StringBuilder result = new StringBuilder();
+result.append(basePath);
+if (!basePath.endsWith(Path.SEPARATOR)) {
+result.append(Path.SEPARATOR);
+}
+int startIndex = 0;
+while (startIndex < regex.length()

Review Comment:
   Still, I'm try to understand why need `connectBasePathAndRegex`. It turns 
out that what the option `source.regex-pattern` is for?
From the source code, seems it'll look like if the table path is `/dir`,  
the `source.regex-pattern` is `t/*`, it will try to match `/dir/t/*`?
   
   TBH, I'm not sure it's reasonable or not for it look wired or unclear to me.
   Another choice is user need to specifc `/dir/t/*` in `source.regex-pattern`.
   
   I don't have a strong perference about these two choice, and   don't know 
which is a better choice. Let's hear other's voice.
   
   



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * 

[GitHub] [flink] liuyongvs commented on pull request #22143: [FLINK-31377][table] Fix array_contains ArrayData.ElementGetter shoul…

2023-06-08 Thread via GitHub


liuyongvs commented on PR #22143:
URL: https://github.com/apache/flink/pull/22143#issuecomment-1583953198

   hi @luoyuxia will you have time to review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223820584


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyServiceProducer.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/** Test implementation for {@link NettyServiceProducer}. */
+public class TestingNettyServiceProducer implements NettyServiceProducer {
+
+private final BiConsumer
+connectionEstablishedConsumer;
+
+private final Consumer connectionBrokenConsumer;
+
+private TestingNettyServiceProducer(
+BiConsumer
+connectionEstablishedConsumer,
+Consumer connectionBrokenConsumer) {
+this.connectionEstablishedConsumer = connectionEstablishedConsumer;
+this.connectionBrokenConsumer = connectionBrokenConsumer;
+}
+
+@Override
+public void connectionEstablished(
+TieredStorageSubpartitionId subpartitionId,
+NettyConnectionWriter nettyConnectionWriter) {
+connectionEstablishedConsumer.accept(subpartitionId, 
nettyConnectionWriter);
+}
+
+@Override
+public void connectionBroken(NettyConnectionId connectionId) {
+connectionBrokenConsumer.accept(connectionId);
+}
+
+/** Builder for {@link TestingNettyServiceProducer}. */
+public static class Builder {
+
+private BiConsumer

Review Comment:
   Please give a default implement, then we can simplify the code when using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223810138


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+/** A dummy subtask key to reuse files among subtasks for private states. 
*/
+private static final SubtaskKey dummySubtaskKey = new SubtaskKey("dummy", 
-1, -1);
+
+/**
+ * OutputStreams to be reused when writing checkpoint files. For 
WITHIN_BOUNDARY mode, physical
+ * files are NOT shared among multiple checkpoints. This map of file 
contains all files that are
+ * not being used and ready to be used.
+ */
+private final Map, 
PhysicalFile>
+availableFiles;
+
+public WithinCheckpointFileMergingSnapshotManager(String id, Executor 
ioExecutor) {
+// currently there is no file size limit For WITHIN_BOUNDARY mode
+super(id, ioExecutor);
+availableFiles = new HashMap<>();
+}
+
+@Override
+@Nonnull
+protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+throws IOException {
+// TODO: FLINK-32076 will add a file pool for each subtask key.
+Tuple3 fileKey =
+Tuple3.of(
+checkpointId,
+scope == CheckpointedStateScope.SHARED ? subtaskKey : 
dummySubtaskKey,
+scope);
+PhysicalFile file;
+synchronized (availableFiles) {
+file = availableFiles.remove(fileKey);

Review Comment:
   > Is it possible that
   > 
   > the same key can map to different physical files (it might not be possible 
at the task level).
   > 
   > Let's discuss this offline.
   
   The same key CAN map to different files. e.g. A file is full under one key, 
a new file is created for further writing for this key.
   
   I think this is not a problem, since it is unnecessary to keep all written 
data in one file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


reswqa commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223798313


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+private static final int INPUT_CHANNEL_INDEX = 0;
+
+@Test
+void testReadBufferOfNonPriorityDataType() {
+int bufferNumber = 1;
+CompletableFuture> 
availableAndPriorityConsumer =
+new CompletableFuture<>();
+CompletableFuture> 
prioritySequenceNumberConsumer =
+new CompletableFuture<>();
+CompletableFuture requiredSegmentIdFuture = new 
CompletableFuture<>();
+Supplier inputChannelSupplier =
+createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+NettyConnectionReader reader =
+createNettyConnectionReader(
+inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+Optional buffer = reader.readBuffer(0);
+assertThat(buffer.isPresent()).isTrue();
+assertThat(buffer.get().isBuffer()).isTrue();
+assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+}
+
+@Test
+void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+int bufferNumber = 2;
+CompletableFuture> 
availableAndPriorityConsumer =
+new CompletableFuture<>();
+CompletableFuture> 
prioritySequenceNumberConsumer =
+new CompletableFuture<>();
+CompletableFuture requiredSegmentIdFuture = new 
CompletableFuture<>();
+Supplier inputChannelSupplier =
+createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+NettyConnectionReader reader =
+createNettyConnectionReader(
+inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+Optional buffer = reader.readBuffer(0);
+assertThat(buffer.isPresent()).isTrue();
+assertThat(buffer.get().isBuffer()).isFalse();
+assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+Tuple2 result1 = availableAndPriorityConsumer.get();
+assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+

[GitHub] [flink] WencongLiu closed pull request #22307: [FLINK-31643][core] Introduce a temporary configuration to enable the tiered store architecture for hybrid shuffle

2023-06-08 Thread via GitHub


WencongLiu closed pull request #22307: [FLINK-31643][core] Introduce a 
temporary configuration to enable the tiered store architecture for hybrid 
shuffle
URL: https://github.com/apache/flink/pull/22307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu closed pull request #22121: [FLINK-27051] fix CompletedCheckpoint.DiscardObject.discard is not idempotent

2023-06-08 Thread via GitHub


WencongLiu closed pull request #22121: [FLINK-27051] fix 
CompletedCheckpoint.DiscardObject.discard is not idempotent
URL: https://github.com/apache/flink/pull/22121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu closed pull request #22318: [FLINK-27637][flink-streaming-java] Optimize the log information when the asynchronous part of checkpoint is canceled

2023-06-08 Thread via GitHub


WencongLiu closed pull request #22318: [FLINK-27637][flink-streaming-java] 
Optimize the log information when the asynchronous part of checkpoint is 
canceled
URL: https://github.com/apache/flink/pull/22318


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32291) Hive E2E test fails consistently

2023-06-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-32291.
---
Resolution: Duplicate

> Hive E2E test fails consistently
> 
>
> Key: FLINK-32291
> URL: https://issues.apache.org/jira/browse/FLINK-32291
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49754=results



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32291) Hive E2E test fails consistently

2023-06-08 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730794#comment-17730794
 ] 

Rui Fan commented on FLINK-32291:
-

It's duplicated with FLINK-32294, and it has been fixed. So close this Jira.

> Hive E2E test fails consistently
> 
>
> Key: FLINK-32291
> URL: https://issues.apache.org/jira/browse/FLINK-32291
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49754=results



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


WencongLiu commented on PR #22342:
URL: https://github.com/apache/flink/pull/22342#issuecomment-1583894707

   Thanks for the careful review of @reswqa and @TanYuxin-tyx !  I've made a 
round of changes, please take a lock when you  have time!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-32294:
---

Assignee: Yuxia Luo

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Assignee: Yuxia Luo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730793#comment-17730793
 ] 

Rui Fan commented on FLINK-32294:
-

Thanks a lot for your quick feedback, let me try it now.:)

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223784984


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringBasedID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+/** An abstraction of logical files in file-merging checkpoints. */
+public class LogicalFile {
+
+/** ID for {@link LogicalFile}. It should be unique for each file. */
+public static class LogicalFileId extends StringBasedID {
+
+public LogicalFileId(String keyString) {
+super(keyString);
+}
+
+public Path getFilePath() {
+return new Path(getKeyString());
+}
+
+public static LogicalFileId generateRandomId() {
+return new LogicalFileId(UUID.randomUUID().toString());
+}
+}
+
+/** ID for this file. */
+LogicalFileId fileId;
+
+private long lastCheckpointId = -1L;
+
+boolean removed = false;
+
+/** The physical file where this logical file is stored. This should be 
null. */
+@Nonnull private final PhysicalFile physicalFile;
+
+@Nonnull private final SubtaskKey subtaskKey;
+
+public LogicalFile(
+LogicalFileId fileId,
+@Nonnull PhysicalFile physicalFile,
+@Nonnull SubtaskKey subtaskKey) {
+this.fileId = fileId;
+this.physicalFile = physicalFile;
+this.subtaskKey = subtaskKey;
+physicalFile.incRefCount();
+}
+
+public LogicalFileId getFileId() {
+return fileId;
+}
+
+public void discardWithCheckpointId(long checkpointId) throws IOException {
+if (!removed && checkpointId >= lastCheckpointId) {
+physicalFile.decRefCount();
+removed = true;
+}
+}
+
+public void advanceLastCheckpointId(long checkpointId) {

Review Comment:
   I added JavaDocs to explain this method. In short, the value 
`lastUsedCheckpointId` acts pretty much like the `lastUsedCheckpointID` in 
`SharedStateEntry`. It is a watermark mechanism to record the last checkpoint 
that uses this logical file, to determine whether to delete a logical file when 
a cp is subsumed or aborted. For those two questions:
   1. No, a logical file is a shared state file before merging, it may shared 
across checkpoints.
   2. When a file is used/reused by a checkpoint, this method is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pan3793 commented on pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-08 Thread via GitHub


pan3793 commented on PR #22694:
URL: https://github.com/apache/flink/pull/22694#issuecomment-1583885964

   Is it designed to support multi-HMS in a single application?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30660) move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

2023-06-08 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730791#comment-17730791
 ] 

luoyuxia edited comment on FLINK-30660 at 6/9/23 2:55 AM:
--

Sorry for that. It turns out that I miss the last one commit while suashing my 
commits in the

[pr|[https://github.com/apache/flink/pull/22679].]  Fixed it via 
[https://github.com/apache/flink/pull/22740]


was (Author: luoyuxia):
Sorry for that. It turns out that I miss the last one commit while suashing my 
commits in the [pr|[https://github.com/apache/flink/pull/22679].] Fix it via 
https://github.com/apache/flink/pull/22740

> move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive 
> e2e
> ---
>
> Key: FLINK-30660
> URL: https://issues.apache.org/jira/browse/FLINK-30660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Assignee: luoyuxia
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive 
> e2e
> [https://github.com/apache/flink/pull/16532/files#]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30660) move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

2023-06-08 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730791#comment-17730791
 ] 

luoyuxia commented on FLINK-30660:
--

Sorry for that. It turns out that I miss the last one commit while suashing my 
commits in the [pr|[https://github.com/apache/flink/pull/22679].] Fix it via 
https://github.com/apache/flink/pull/22740

> move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive 
> e2e
> ---
>
> Key: FLINK-30660
> URL: https://issues.apache.org/jira/browse/FLINK-30660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Assignee: luoyuxia
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive 
> e2e
> [https://github.com/apache/flink/pull/16532/files#]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30660) move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

2023-06-08 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730791#comment-17730791
 ] 

luoyuxia edited comment on FLINK-30660 at 6/9/23 2:55 AM:
--

Sorry for that. It turns out that I miss the last one commit while suashing my 
commits in the

[pr|[https://github.com/apache/flink/pull/22679]]  Fixed it via 
[https://github.com/apache/flink/pull/22740]


was (Author: luoyuxia):
Sorry for that. It turns out that I miss the last one commit while suashing my 
commits in the

[pr|[https://github.com/apache/flink/pull/22679].]  Fixed it via 
[https://github.com/apache/flink/pull/22740]

> move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive 
> e2e
> ---
>
> Key: FLINK-30660
> URL: https://issues.apache.org/jira/browse/FLINK-30660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Assignee: luoyuxia
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive 
> e2e
> [https://github.com/apache/flink/pull/16532/files#]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730790#comment-17730790
 ] 

luoyuxia edited comment on FLINK-32294 at 6/9/23 2:52 AM:
--

master: eacd5c1d90c98349a35c25dc420eb59eec7bf698

[~fanrui] Should be fixed. You can try to rebase master again. Sorry for that.


was (Author: luoyuxia):
master: eacd5c1d90c98349a35c25dc420eb59eec7bf698

[~fanrui] Should fix. You can try to rebase master again. Sorry for that.

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-32294:
-
Fix Version/s: 1.18.0

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia closed FLINK-32294.

Resolution: Fixed

master: eacd5c1d90c98349a35c25dc420eb59eec7bf698

[~fanrui] Should fix. You can try to rebase master again. Sorry for that.

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32294:
---
Labels: pull-request-available  (was: )

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] Samrat002 commented on pull request #51: [FLINK-32176] Exclude snakeyaml from pulsar-client-all to mitigate CVE-2022-1471

2023-06-08 Thread via GitHub


Samrat002 commented on PR #51:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/51#issuecomment-1583879236

   Thank you @tisonkun, @syhily taking time in reviewing the small change. 
   
   my intension was to exclude snakeyaml and mitigate vulnerablity, I missed to 
see the added unnecessary dependencies creating technical debt in future 
upgrades. 
   
   > If it should not be in the fat jar at all, you can submit a PR to Pulsar 
upstream and we reduce this dependency by upgrade pulsar version.
   
   Yes, i can submit a pr in pulsar upstream and reduce this dependency by 
upgrading the pulsar version. This would be cleaner way. 
   
   
   > BTW, Pulsar client didn't use snakeyaml internally. So the CVE you report 
on snakeyaml won't occur on 
[flink-connector-pulsar](https://issues.apache.org/jira/browse/FLINK-connector-pulsar).
   
   I have one query regarding this, Do we really need pulsar client all here in 
flink connector pulsar ?
   Cant we just use the only pulsar client that is used , using pulsar client 
all will bring other client that is not required here . 
   Looking forward to hear your opinion on it  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia merged pull request #22740: [FLINK-32294][e2e] Fix HiveITCase failure

2023-06-08 Thread via GitHub


luoyuxia merged PR #22740:
URL: https://github.com/apache/flink/pull/22740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730789#comment-17730789
 ] 

luoyuxia commented on FLINK-32294:
--

[~fanrui] Thanks for reporting. 

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223779099


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+private final String id;
+
+protected final Executor ioExecutor;
+
+// file system and directories
+protected FileSystem fs;
+protected Path checkpointDir;
+protected Path sharedStateDir;
+protected Path taskOwnedStateDir;
+
+protected int writeBufferSize;
+private boolean fileSystemInitiated = false;
+
+protected boolean syncAfterClosingLogicalFile;
+
+protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+private final Map managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+protected Path managedExclusiveStateDir;
+
+public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+this.id = id;
+this.ioExecutor = ioExecutor;
+}
+
+@Override
+public void initFileSystem(
+FileSystem fileSystem,
+Path checkpointBaseDir,
+Path sharedStateDir,
+Path taskOwnedStateDir) {
+if (fileSystemInitiated) {
+Preconditions.checkArgument(
+checkpointBaseDir.equals(this.checkpointDir),
+"The checkpoint base dir is not deterministic across 
subtasks.");
+Preconditions.checkArgument(
+sharedStateDir.equals(this.sharedStateDir),
+"The shared checkpoint dir is not deterministic across 
subtasks.");
+Preconditions.checkArgument(
+taskOwnedStateDir.equals(this.taskOwnedStateDir),
+"The task-owned checkpoint dir is not deterministic across 
subtasks.");
+return;
+}
+this.fs = fileSystem;
+this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+this.fileSystemInitiated = true;
+this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+// Initialize the managed exclusive path using id as the child path 
name.
+// Currently, we use the task-owned directory to place the merged 
private state. According
+// to the FLIP-306, we later consider move these files to the new 
introduced
+// task-manager-owned directory.
+Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+createManagedDirectory(managedExclusivePath);
+this.managedExclusiveStateDir = managedExclusivePath;
+}
+
+@Override
+public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+String managedDirName = subtaskKey.getManagedDirName();
+Path managedPath = new Path(sharedStateDir, 

[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223776437


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+private final String id;
+
+protected final Executor ioExecutor;
+
+// file system and directories
+protected FileSystem fs;
+protected Path checkpointDir;
+protected Path sharedStateDir;
+protected Path taskOwnedStateDir;
+
+protected int writeBufferSize;
+private boolean fileSystemInitiated = false;
+
+protected boolean syncAfterClosingLogicalFile;
+
+protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+private final Map managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+protected Path managedExclusiveStateDir;
+
+public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+this.id = id;
+this.ioExecutor = ioExecutor;
+}
+
+@Override
+public void initFileSystem(
+FileSystem fileSystem,
+Path checkpointBaseDir,
+Path sharedStateDir,
+Path taskOwnedStateDir) {
+if (fileSystemInitiated) {
+Preconditions.checkArgument(
+checkpointBaseDir.equals(this.checkpointDir),
+"The checkpoint base dir is not deterministic across 
subtasks.");
+Preconditions.checkArgument(
+sharedStateDir.equals(this.sharedStateDir),
+"The shared checkpoint dir is not deterministic across 
subtasks.");
+Preconditions.checkArgument(
+taskOwnedStateDir.equals(this.taskOwnedStateDir),
+"The task-owned checkpoint dir is not deterministic across 
subtasks.");
+return;
+}

Review Comment:
   I explained these two questions in JavaDocs of the base interface 
`FileMergingSnapshotManager` and the `#initFileSystem`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia closed pull request #22739: fix hive e2e test failure

2023-06-08 Thread via GitHub


luoyuxia closed pull request #22739: fix hive e2e test failure
URL: https://github.com/apache/flink/pull/22739


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223775173


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+private final String id;
+
+protected final Executor ioExecutor;
+
+// file system and directories
+protected FileSystem fs;
+protected Path checkpointDir;
+protected Path sharedStateDir;
+protected Path taskOwnedStateDir;
+
+protected int writeBufferSize;
+private boolean fileSystemInitiated = false;
+
+protected boolean syncAfterClosingLogicalFile;

Review Comment:
   This value is decided by the FS. I add a TODO in function 
`shouldSyncAfterClosingLogicalFile()` to check by FS later. The meaning of this 
value is "does this FS require file sync to ensure visibility", it is a demand 
instead of an ability. I change the name of this value.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-06-08 Thread via GitHub


Zakelly commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1223773671


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
+
+import java.io.Closeable;
+
+/**
+ * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
+ * checkpoint files with merging checkpoint files enabled. 
FileMergingSnapshotManager resides on the
+ * TM side.
+ *
+ * TODO (FLINK-32073): create output stream.
+ *
+ * TODO (FLINK-32075): leverage checkpoint notification to delete logical 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+/**
+ * Initialize the file system, recording the checkpoint path the manager 
should work with.
+ *
+ * 
+ * The layout of checkpoint directory:
+ * /user-defined-checkpoint-dir
+ * /{job-id} (checkpointBaseDir)
+ * |
+ * + --shared/
+ * |
+ * + --subtask-1/
+ * + -- merged shared state files
+ * + --subtask-2/
+ * + -- merged shared state files
+ * + --taskowned/
+ * + -- merged private state files
+ * + --chk-1/
+ * + --chk-2/
+ * + --chk-3/
+ * 
+ *
+ * The reason why initializing directories in this method instead of 
the constructor is that
+ * the FileMergingSnapshotManager itself belongs to the {@link 
TaskStateManager}, which is
+ * initialized when receiving a task, while the base directories for 
checkpoint are created by
+ * {@link FsCheckpointStorageAccess} when the state backend initializing. 
After the checkpoint
+ * directories are initialized, the managed subdirectories are initialized 
here.
+ *
+ * Note: This method may be called several times, the implementation 
should ensure
+ * idempotency, and throw {@link IllegalArgumentException} when any of the 
path in params change
+ * across function calls.
+ *
+ * @param fileSystem The filesystem to write to.
+ * @param checkpointBaseDir The base directory for checkpoints.
+ * @param sharedStateDir The directory for shared checkpoint data.
+ * @param taskOwnedStateDir The name of the directory for state not 
owned/released by the
+ * master, but by the TaskManagers.
+ * @throws IllegalArgumentException thrown if these three paths are not 
deterministic across
+ * calls.
+ */
+void initFileSystem(
+FileSystem fileSystem,
+Path checkpointBaseDir,
+Path sharedStateDir,
+Path taskOwnedStateDir);
+
+/**
+ * Register a subtask and create the managed directory for shared states.
+ *
+ * @param subtaskKey the subtask key identifying a subtask.
+ * @see #initFileSystem for layout information.
+ */
+void registerSubtaskForSharedStates(SubtaskKey subtaskKey);
+
+/**
+ * Get the managed directory of the file-merging snapshot manager, created 
in {@link
+ * #initFileSystem} or {@link #registerSubtaskForSharedStates}.
+ *
+ * @param subtaskKey the subtask key identifying the subtask.
+ * @param scope the checkpoint scope.
+ * @return the managed directory for one subtask in specified checkpoint 
scope.
+ */
+Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
+/** A key identifies a subtask. */
+final class SubtaskKey {
+final String taskName;
+final int subtaskIndex;
+final int parallelism;
+
+final int hashCode;
+
+SubtaskKey(TaskInfo taskInfo) {
+this.taskName = 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


TanYuxin-tyx commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223761576


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link TieredStorageNettyService}. */
+public class TieredStorageNettyServiceImpl implements 
TieredStorageNettyService {
+
+// 
+//  For producer side
+// 
+
+private final Map>
+registeredServiceProducers = new ConcurrentHashMap<>();
+
+private final Map
+registeredAvailabilityListeners = new ConcurrentHashMap<>();
+
+// 
+//  For consumer side
+// 
+
+private final Map>
+registeredChannelIndexes = new ConcurrentHashMap<>();
+
+private final Map<
+TieredStoragePartitionId,
+Map>>
+registeredInputChannelProviders = new ConcurrentHashMap<>();
+
+private final Map<
+TieredStoragePartitionId,
+Map<
+TieredStorageSubpartitionId,
+
NettyConnectionReaderAvailabilityAndPriorityHelper>>
+registeredNettyConnectionReaderAvailabilityAndPriorityHelpers =
+new ConcurrentHashMap<>();
+
+@Override
+public void registerProducer(
+TieredStoragePartitionId partitionId, NettyServiceProducer 
serviceProducer) {
+List serviceProducers =
+registeredServiceProducers.getOrDefault(partitionId, new 
ArrayList<>());
+serviceProducers.add(serviceProducer);
+registeredServiceProducers.put(partitionId, serviceProducers);
+}
+
+@Override
+public NettyConnectionReader registerConsumer(
+TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
+Integer channelIndex = 
registeredChannelIndexes.get(partitionId).remove(subpartitionId);
+if (registeredChannelIndexes.get(partitionId).isEmpty()) {
+registeredChannelIndexes.remove(partitionId);
+}
+
+Supplier inputChannelProvider =
+
registeredInputChannelProviders.get(partitionId).remove(subpartitionId);
+if (registeredInputChannelProviders.get(partitionId).isEmpty()) {
+registeredInputChannelProviders.remove(partitionId);
+}
+
+NettyConnectionReaderAvailabilityAndPriorityHelper helper =
+registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+.get(partitionId)
+.remove(subpartitionId);
+if (registeredNettyConnectionReaderAvailabilityAndPriorityHelpers
+.get(partitionId)
+.isEmpty()) {
+

[jira] [Commented] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730785#comment-17730785
 ] 

Rui Fan commented on FLINK-32294:
-

Hi [~yuxia] , I'm working on FLINK-30585[1], it's related to flame graph, and 
shouldn't affect the HiveITCase. However, these 2 ITCases fail twice.
 * HiveITCase.testHiveDialect
 * HiveITCase.testReadWriteHive

Would you mind help take a look in your free time? thanks~

  [1] https://github.com/apache/flink/pull/22552

> The CI fails due to HiveITCase
> --
>
> Key: FLINK-32294
> URL: https://issues.apache.org/jira/browse/FLINK-32294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.18.0
>Reporter: Rui Fan
>Priority: Major
>
> 2 ITCases fail:
>  * HiveITCase.testHiveDialect
>  * HiveITCase.testReadWriteHive
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]
>  
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-shared-utils] leonardBang merged pull request #14: [hotfix] Parameter '--message' is required for SVN delete command

2023-06-08 Thread via GitHub


leonardBang merged PR #14:
URL: https://github.com/apache/flink-connector-shared-utils/pull/14


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223768558


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.NONE;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.PRIORITIZED_EVENT_BUFFER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link NettyConnectionReader}. */
+class NettyConnectionReaderTest {
+
+private static final int INPUT_CHANNEL_INDEX = 0;
+
+@Test
+void testReadBufferOfNonPriorityDataType() {
+int bufferNumber = 1;
+CompletableFuture> 
availableAndPriorityConsumer =
+new CompletableFuture<>();
+CompletableFuture> 
prioritySequenceNumberConsumer =
+new CompletableFuture<>();
+CompletableFuture requiredSegmentIdFuture = new 
CompletableFuture<>();
+Supplier inputChannelSupplier =
+createInputChannelSupplier(bufferNumber, false, 
requiredSegmentIdFuture);
+NettyConnectionReader reader =
+createNettyConnectionReader(
+inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+Optional buffer = reader.readBuffer(0);
+assertThat(buffer.isPresent()).isTrue();
+assertThat(buffer.get().isBuffer()).isTrue();
+assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+assertThat(availableAndPriorityConsumer.isDone()).isFalse();
+assertThat(prioritySequenceNumberConsumer.isDone()).isFalse();
+}
+
+@Test
+void testReadBufferOfPriorityDataType() throws ExecutionException, 
InterruptedException {
+int bufferNumber = 2;
+CompletableFuture> 
availableAndPriorityConsumer =
+new CompletableFuture<>();
+CompletableFuture> 
prioritySequenceNumberConsumer =
+new CompletableFuture<>();
+CompletableFuture requiredSegmentIdFuture = new 
CompletableFuture<>();
+Supplier inputChannelSupplier =
+createInputChannelSupplier(bufferNumber, true, 
requiredSegmentIdFuture);
+NettyConnectionReader reader =
+createNettyConnectionReader(
+inputChannelSupplier,
+
createAvailableAndPriorityConsumer(availableAndPriorityConsumer),
+
createPrioritySequenceNumberConsumer(prioritySequenceNumberConsumer));
+Optional buffer = reader.readBuffer(0);
+assertThat(buffer.isPresent()).isTrue();
+assertThat(buffer.get().isBuffer()).isFalse();
+assertThat(requiredSegmentIdFuture.isDone()).isFalse();
+Tuple2 result1 = availableAndPriorityConsumer.get();
+assertThat(result1.f0).isEqualTo(INPUT_CHANNEL_INDEX);
+

[jira] [Created] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread Rui Fan (Jira)
Rui Fan created FLINK-32294:
---

 Summary: The CI fails due to HiveITCase
 Key: FLINK-32294
 URL: https://issues.apache.org/jira/browse/FLINK-32294
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.18.0
Reporter: Rui Fan


2 ITCases fail:
 * HiveITCase.testHiveDialect
 * HiveITCase.testReadWriteHive

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]

 

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32188) Support to "where" query with a fixed-value array and simplify condition for an array-type filed.

2023-06-08 Thread Xin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-32188:
-
Summary: Support to "where" query with a fixed-value array and simplify 
condition for an array-type filed.  (was: Support to "where" query and simplify 
condition for an array-type filed of temporary table with a fixed-value array.)

> Support to "where" query with a fixed-value array and simplify condition for 
> an array-type filed.
> -
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> image-2023-06-06-16-50-10-805.png, image-2023-06-06-16-50-54-467.png, 
> screenshot-1.png, screenshot-10.png, screenshot-11.png, screenshot-12.png, 
> screenshot-2.png, screenshot-3.png, screenshot-4.png, screenshot-5.png, 
> screenshot-6.png, screenshot-7.png, screenshot-8.png, screenshot-9.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> met issues while creating a table with ddl to specify a field "URL" as an 
> array type. When submitting an SQL task with Flink, I specified query of this 
> field with a fixed array. For example, "select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg']", but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin closed FLINK-32292.

Resolution: Fixed

> TableUtils.getRowTypeInfo fails to get type information of Tuple
> 
>
> Key: FLINK-32292
> URL: https://issues.apache.org/jira/browse/FLINK-32292
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32188) Support to "where" query and simplify condition for an array-type filed of temporary table with a fixed-value array.

2023-06-08 Thread Xin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-32188:
-
Summary: Support to "where" query and simplify condition for an array-type 
filed of temporary table with a fixed-value array.  (was: Support to "where" 
query and simplify condition for an array-type filed of temporary table.)

> Support to "where" query and simplify condition for an array-type filed of 
> temporary table with a fixed-value array.
> 
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> image-2023-06-06-16-50-10-805.png, image-2023-06-06-16-50-54-467.png, 
> screenshot-1.png, screenshot-10.png, screenshot-11.png, screenshot-12.png, 
> screenshot-2.png, screenshot-3.png, screenshot-4.png, screenshot-5.png, 
> screenshot-6.png, screenshot-7.png, screenshot-8.png, screenshot-9.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> met issues while creating a table with ddl to specify a field "URL" as an 
> array type. When submitting an SQL task with Flink, I specified query of this 
> field with a fixed array. For example, "select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg']", but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zhougit86 commented on pull request #22624: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…

2023-06-08 Thread via GitHub


zhougit86 commented on PR #22624:
URL: https://github.com/apache/flink/pull/22624#issuecomment-1583841476

   @luoyuxia please find the test case in the commit, when the "cast( a as 
bigint)" function's input type is "STRING().notNull()", we suppose to get a 
null value. but now it returns a default value "-1". this is not correct.
   
   In the test case, I made it throw a not null enforcer exception. without the 
change, I will return a -1, which I think is not correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730784#comment-17730784
 ] 

Dong Lin commented on FLINK-32292:
--

Merged to apache/flink-ml master branch 0b6b7f70e45bebfa5f66e9405f152031607bc45a

> TableUtils.getRowTypeInfo fails to get type information of Tuple
> 
>
> Key: FLINK-32292
> URL: https://issues.apache.org/jira/browse/FLINK-32292
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] zhipeng93 opened a new pull request, #242: [FLINK-32293] Support sparse vector with long index

2023-06-08 Thread via GitHub


zhipeng93 opened a new pull request, #242:
URL: https://github.com/apache/flink-ml/pull/242

   ## What is the purpose of the change
   
   Add support for sparse vector with long as index and double as value.
   
   ## Brief change log
   - Renamed `SparseVector` as `SparseIntDoubleVector`, `DenseVector` as 
`DenseIntDoubleVector` and added `SparseLongDoubleVector`
   - Added test for SparseLongDouble vector
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32293) Support vector with long index

2023-06-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32293:
---
Labels: pull-request-available  (was: )

> Support vector with long index
> --
>
> Key: FLINK-32293
> URL: https://issues.apache.org/jira/browse/FLINK-32293
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Currently in Flink ML, we only support sparse and dense vector with `int` as 
> index and `double` as value.
>  
> However, there are real-world cases that the index of a vector could exceed 
> the range of `INT.MAX`. Thus we need to support vector with `long` index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-32292:


Assignee: Zhipeng Zhang

> TableUtils.getRowTypeInfo fails to get type information of Tuple
> 
>
> Key: FLINK-32292
> URL: https://issues.apache.org/jira/browse/FLINK-32292
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Assignee: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32188) Support to "where" query and simplify condition for an array-type filed of temporary table.

2023-06-08 Thread Xin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-32188:
-
Summary: Support to "where" query and simplify condition for an array-type 
filed of temporary table.  (was: Does the custom connector not support pushing 
down "where" query predicates to query fields of array type?)

> Support to "where" query and simplify condition for an array-type filed of 
> temporary table.
> ---
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> image-2023-06-06-16-50-10-805.png, image-2023-06-06-16-50-54-467.png, 
> screenshot-1.png, screenshot-10.png, screenshot-11.png, screenshot-12.png, 
> screenshot-2.png, screenshot-3.png, screenshot-4.png, screenshot-5.png, 
> screenshot-6.png, screenshot-7.png, screenshot-8.png, screenshot-9.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> met issues while creating a table with ddl to specify a field "URL" as an 
> array type. When submitting an SQL task with Flink, I specified query of this 
> field with a fixed array. For example, "select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg']", but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 merged pull request #241: [FLINK-32292] Fix TableUtils.getRowTypeInfo when the input contains Tuple

2023-06-08 Thread via GitHub


lindong28 merged PR #241:
URL: https://github.com/apache/flink-ml/pull/241


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on pull request #241: [FLINK-32292] Fix TableUtils.getRowTypeInfo when the input contains Tuple

2023-06-08 Thread via GitHub


lindong28 commented on PR #241:
URL: https://github.com/apache/flink-ml/pull/241#issuecomment-1583838532

   Thanks for the PR. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22742: [FLINK-30629][Client/Job Submission] Increase clientHeartbeatTimeout to 1 second

2023-06-08 Thread via GitHub


flinkbot commented on PR #22742:
URL: https://github.com/apache/flink/pull/22742#issuecomment-1583837770

   
   ## CI report:
   
   * 96d5d46ed9d0a4ff84f1d6dee053b4647716131f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-06-08 Thread Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730783#comment-17730783
 ] 

Liu commented on FLINK-30629:
-

[~Sergey Nuyanzin] Fixed. Please review the code. Thanks.

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt, 
> logs-cron_azure-test_cron_azure_core-1685497478.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Myracle opened a new pull request, #22742: [FLINK-30629][Client/Job Submission] Increase clientHeartbeatTimeout to 1 second

2023-06-08 Thread via GitHub


Myracle opened a new pull request, #22742:
URL: https://github.com/apache/flink/pull/22742

   ## What is the purpose of the change
   
   *Increase clientHeartbeatTimeout to 1 second to avoid shutting down the job 
in ClientHeartbeatTest.*
   
   
   ## Brief change log
 - *Increase clientHeartbeatTimeout to 1 second*
   
   
   ## Verifying this change
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22342: [FLINK-31636][network] Upstream supports reading buffers from tiered store

2023-06-08 Thread via GitHub


WencongLiu commented on code in PR #22342:
URL: https://github.com/apache/flink/pull/22342#discussion_r1223745155


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.NettyPayload;
+
+import java.util.Queue;
+
+/** The default implementation of {@link NettyConnectionWriter}. */
+public class NettyConnectionWriterImpl implements NettyConnectionWriter {
+
+private final Queue bufferQueue;
+
+private final NettyConnectionId connectionId;
+
+public NettyConnectionWriterImpl(Queue bufferQueue) {
+this.bufferQueue = bufferQueue;
+this.connectionId = NettyConnectionId.newId();
+}
+
+@Override
+public NettyConnectionId getNettyConnectionId() {
+return connectionId;
+}
+
+@Override
+public int numQueuedBuffers() {
+return bufferQueue.size();
+}
+
+@Override
+public void writeBuffer(NettyPayload nettyPayload) {
+bufferQueue.add(nettyPayload);
+}
+
+@Override
+public void close() {
+NettyPayload nettyPayload;
+while ((nettyPayload = bufferQueue.poll()) != null) {

Review Comment:
   If the NettyConnectionWriterImpl is trying to close, it will ignore all 
payloads in the writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] SinBex commented on a diff in pull request #21634: [FLINK-30480][runtime] Add benchmarks for adaptive batch scheduler.

2023-06-08 Thread via GitHub


SinBex commented on code in PR #21634:
URL: https://github.com/apache/flink/pull/21634#discussion_r1223745121


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java:
##
@@ -103,17 +113,55 @@ public static ExecutionGraph createAndInitExecutionGraph(
 final ComponentMainThreadExecutor mainThreadExecutor =
 ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
-final DefaultScheduler scheduler =
+DefaultSchedulerBuilder schedulerBuilder =
 new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
scheduledExecutorService)
 .setIoExecutor(scheduledExecutorService)
 .setFutureExecutor(scheduledExecutorService)
 .setDelayExecutor(
-new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))
-.build();
+new 
ScheduledExecutorServiceAdapter(scheduledExecutorService));
+if (jobConfiguration.getJobType() == JobType.BATCH) {
+AdaptiveBatchScheduler adaptiveBatchScheduler =
+schedulerBuilder
+.setVertexParallelismAndInputInfosDecider(
+createCustomParallelismDecider(
+jobConfiguration.getParallelism()))
+.setInputConsumableDeciderFactory(

Review Comment:
   That's right, I have fixed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32293) Support vector with long index

2023-06-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-32293:
--
Component/s: Library / Machine Learning

> Support vector with long index
> --
>
> Key: FLINK-32293
> URL: https://issues.apache.org/jira/browse/FLINK-32293
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently in Flink ML, we only support sparse and dense vector with `int` as 
> index and `double` as value.
>  
> However, there are real-world cases that the index of a vector could exceed 
> the range of `INT.MAX`. Thus we need to support vector with `long` index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32293) Support vector with long index

2023-06-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32293:
-

 Summary: Support vector with long index
 Key: FLINK-32293
 URL: https://issues.apache.org/jira/browse/FLINK-32293
 Project: Flink
  Issue Type: New Feature
Reporter: Zhipeng Zhang


Currently in Flink ML, we only support sparse and dense vector with `int` as 
index and `double` as value.

 

However, there are real-world cases that the index of a vector could exceed the 
range of `INT.MAX`. Thus we need to support vector with `long` index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32292:
---
Labels: pull-request-available  (was: )

> TableUtils.getRowTypeInfo fails to get type information of Tuple
> 
>
> Key: FLINK-32292
> URL: https://issues.apache.org/jira/browse/FLINK-32292
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Zhipeng Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] zhipeng93 opened a new pull request, #241: [FLINK-32292] Fix TableUtils.getRowTypeInfo when the input contains Tuple

2023-06-08 Thread via GitHub


zhipeng93 opened a new pull request, #241:
URL: https://github.com/apache/flink-ml/pull/241

   ## What is the purpose of the change
   
   Fix TableUtils.getRowTypeInfo when the input contains Tuple
   
   ## Brief change log
   
   *(for example:)*
 - Use `ExternalTypeInfo.of(..)` when the input contains Tuples.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32292:
-

 Summary: TableUtils.getRowTypeInfo fails to get type information 
of Tuple
 Key: FLINK-32292
 URL: https://issues.apache.org/jira/browse/FLINK-32292
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #617: [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric V2

2023-06-08 Thread via GitHub


morhidi commented on PR #617:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/617#issuecomment-1583711883

   @mxm @gyfora this is my second attempt on adding the 
`RECOMMENDED_PARALLELISM` let me know what you think


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #617: [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric

2023-06-08 Thread via GitHub


morhidi opened a new pull request, #617:
URL: https://github.com/apache/flink-kubernetes-operator/pull/617

   ## What is the purpose of the change
   
   It is beneficial to report the recommended parallelism and overlay it with 
the current parallelism on the same chart when auto scaler is running in 
advisor mode.
   
   ## Brief change log
   - Added a new `RECOMMENDED_PARALLELISM` entry to `ScalingMetric`
   - And the latest recommended parallelisms are now being reported as 
evaluated scaling metric
   - Recommended parallelisms follow the logic below:
 - `RECOMMENDED_PARALLELISM` will be set to `PARALLELISM` while the metric 
window is filling up
 - `RECOMMENDED_PARALLELISM` may change according to the evaluated scaling 
metrics
 - `PARALLELISM` will be set to `RECOMMENDED_PARALLELISM` after scaling if 
scaling is enabled
 - `RECOMMENDED_PARALLELISM` will set to `null` during a scaling operation, 
while metric collection is not possible
  - `RECOMMENDED_PARALLELISM` will be set again to the changed 
`PARALLELISM` while the metric window is filling up
   
   ## Verifying this change
   
   - Added unit test for recommended parallelisms
   - Manual by checking the actual metrics being reported
   
   ```
   TODO
   ```
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
( no)
 - Core observer or reconciler logic that is regularly executed: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22741: Flink 31413 final fix failure

2023-06-08 Thread via GitHub


flinkbot commented on PR #22741:
URL: https://github.com/apache/flink/pull/22741#issuecomment-1583656598

   
   ## CI report:
   
   * 0dde2279a8457a2e79b95f2c01e8837725df29bd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia opened a new pull request, #22741: Flink 31413 final fix failure

2023-06-08 Thread via GitHub


luoyuxia opened a new pull request, #22741:
URL: https://github.com/apache/flink/pull/22741

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22740: [hotfix][e2e] Fix HiveITCase failure

2023-06-08 Thread via GitHub


flinkbot commented on PR #22740:
URL: https://github.com/apache/flink/pull/22740#issuecomment-1583644884

   
   ## CI report:
   
   * 7a43c44f6b01237de02bb4a3443be8c3d4238ce0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia opened a new pull request, #22740: [hotfix][e2e] Fix HiveITCase failure

2023-06-08 Thread via GitHub


luoyuxia opened a new pull request, #22740:
URL: https://github.com/apache/flink/pull/22740

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22739: fix hive e2e test failure

2023-06-08 Thread via GitHub


flinkbot commented on PR #22739:
URL: https://github.com/apache/flink/pull/22739#issuecomment-1583617435

   
   ## CI report:
   
   * c2c0195b268f8e13f0e9778bda9ca3f19831169c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia opened a new pull request, #22739: fix hive e2e test failure

2023-06-08 Thread via GitHub


luoyuxia opened a new pull request, #22739:
URL: https://github.com/apache/flink/pull/22739

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #613: [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric

2023-06-08 Thread via GitHub


morhidi commented on PR #613:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/613#issuecomment-1583359865

   > Thanks @morhidi! That's a very usual addition. I'm just a bit concerned 
about the complexity here. I think this could have been implemented a bit 
simpler. If we were to expose the recommended parallelism as an evaluated 
scaling metric, the changes would be a few lines of code. We could reuse all 
existing logic to expose metrics.
   > 
   > I would also appreciate if we could keep the unrelated changes / 
refactoring to a bare minimum. This makes reviewing much easier and keeps the 
commit history clean.
   > 
   > All in all, this will be super useful.
   
   Thanks @mxm for the comprehensice review. My original thought was to 
implement this as an evaluated metric, but the idea was abandoned and went on 
this rout instead.  I'll give that idea another shot, also trying not to be so 
disruptive with refactoring on a single PR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #613: [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric

2023-06-08 Thread via GitHub


morhidi commented on code in PR #613:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/613#discussion_r1223557386


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##
@@ -100,7 +100,9 @@ public CollectedMetricHistory updateMetrics(
 cleanup(cr);
 metricHistory.clear();
 metricCollectionStartTs = now;
+cleanupRecommendedParallelisms(recommendedParallelisms, 
resourceID);
 }
+var topology = getJobTopology(flinkService, cr, conf, autoscalerInfo);

Review Comment:
   It was necessary in certain cases it fetched the new job graph and then 
viped it out with the metric history.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] daniel-packard commented on pull request #158: [ADD-SYMLINKS] Add symlinks for FLINK_VERSION to FLINK_RELEASE jars

2023-06-08 Thread via GitHub


daniel-packard commented on PR #158:
URL: https://github.com/apache/flink-docker/pull/158#issuecomment-1583139573

   > Wouldn't globbing patterns or similar allow you to handle this, like 
flink-s3-fs-presto-1.16.*.jar?
   
   I didn't know this was an option in helm/k8s config - but I will look into 
it to see if it's possible!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] daniel-packard commented on pull request #158: [ADD-SYMLINKS] Add symlinks for FLINK_VERSION to FLINK_RELEASE jars

2023-06-08 Thread via GitHub


daniel-packard commented on PR #158:
URL: https://github.com/apache/flink-docker/pull/158#issuecomment-1583138454

   > You'd have to explicitly upgrade to the next Flink patch version, but that 
may be a good idea anyway depending on how you deploy Flink. You do _not_ want 
different processes of Flink using different patch versions, so unless you have 
safeguards to prevent that in the event that you deploy something at the exact 
time we do a release, you should maybe consider it.
   
   I'm not sure I follow your intent here... are you saying that `1.16.1` -> 
`1.16.2` is an unsafe version transition to happen automatically?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26948) Add SORT_ARRAY supported in SQL & Table API

2023-06-08 Thread Bonnie Varghese (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730660#comment-17730660
 ] 

Bonnie Varghese commented on FLINK-26948:
-

[~twalthr] can we rename this function to `ARRAY_SORT` just to be consistent 
with all the other ARRAY_* functions?

> Add SORT_ARRAY supported in SQL & Table API
> ---
>
> Key: FLINK-26948
> URL: https://issues.apache.org/jira/browse/FLINK-26948
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Major
>
> Returns the array in {{expr}} in sorted order.
> Syntax:
> {code:java}
> sort_array(expr [, ascendingOrder] ) {code}
> Arguments:
>  * {{{}expr{}}}: An ARRAY expression of sortable elements.
>  * {{{}ascendingOrder{}}}: An optional BOOLEAN expression defaulting to 
> {{{}true{}}}.
> Returns:
> The result type matches {{{}expr{}}}.
> Sorts the input array in ascending or descending order according to the 
> natural ordering of the array elements. {{NULL}} elements are placed at the 
> beginning of the returned array in ascending order or at the end of the 
> returned array in descending order.
> Examples:
> {code:java}
> > SELECT sort_array(array('b', 'd', NULL, 'c', 'a'), true);
>  [NULL,a,b,c,d] {code}
> See more:
>  * 
> [Spark|https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26948) Add SORT_ARRAY supported in SQL & Table API

2023-06-08 Thread Bonnie Varghese (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730658#comment-17730658
 ] 

Bonnie Varghese commented on FLINK-26948:
-

[~joern] are you still interested in implementing this function?

> Add SORT_ARRAY supported in SQL & Table API
> ---
>
> Key: FLINK-26948
> URL: https://issues.apache.org/jira/browse/FLINK-26948
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Major
>
> Returns the array in {{expr}} in sorted order.
> Syntax:
> {code:java}
> sort_array(expr [, ascendingOrder] ) {code}
> Arguments:
>  * {{{}expr{}}}: An ARRAY expression of sortable elements.
>  * {{{}ascendingOrder{}}}: An optional BOOLEAN expression defaulting to 
> {{{}true{}}}.
> Returns:
> The result type matches {{{}expr{}}}.
> Sorts the input array in ascending or descending order according to the 
> natural ordering of the array elements. {{NULL}} elements are placed at the 
> beginning of the returned array in ascending order or at the end of the 
> returned array in descending order.
> Examples:
> {code:java}
> > SELECT sort_array(array('b', 'd', NULL, 'c', 'a'), true);
>  [NULL,a,b,c,d] {code}
> See more:
>  * 
> [Spark|https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223351706


##
docs/data/sql_functions.yml:
##
@@ -646,6 +646,9 @@ collection:
   - sql: ARRAY_UNION(array1, array2)
 table: haystack.arrayUnion(array)
 description: Returns an array of the elements in the union of array1 and 
array2, without duplicates. If any of the array is null, the function will 
return null.
+  - sql: ARRAY_CONCAT(array1, array2, tail...)
+table: array1.arrayConcat(array1, array2, tail...)
+description: Returns an array of the elements in the concat of at least 
two arrays, allow duplicates. If all of the arrays are null, the function will 
return null.

Review Comment:
   In jira it was mentioned that we could do it in BigQuery's way
   BigQuery allows having only one array and just returns it back.
   Why should we not allow this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


hanyuzheng7 commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223349860


##
flink-python/pyflink/table/expression.py:
##
@@ -1519,6 +1519,13 @@ def array_union(self, array) -> 'Expression':
 """
 return _binary_op("arrayUnion")(self, array)
 
+def array_concat(self, array) -> 'Expression':
+"""
+Returns an array of the elements in the concat of array1 and array2, 
allow duplicates.

Review Comment:
   fix it. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


hanyuzheng7 commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223349316


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) {
 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
 }
 
+/**
+ * Returns an array of the elements in the concat of array1 and array2, 
without duplicates.

Review Comment:
   fixed it. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223346783


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) {
 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
 }
 
+/**
+ * Returns an array of the elements in the concat of array1 and array2, 
without duplicates.
+ *
+ * If both of the array are null, the function will return null.
+ */
+public OutType arrayConcat(InType... arrays) {
+arrays = convertToArrays(arrays);
+Expression[] args =
+Stream.concat(
+Stream.of(toExpr()),
+
Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression))
+.toArray(Expression[]::new);
+return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args));
+}
+
+private InType[] convertToArrays(InType[] arrays) {
+if (arrays == null || arrays.length < 1) {
+throw new ValidationException("need at least two arrays");
+}
+int numberOfNull = 0;
+InType notNullArray = null;
+for (int i = 0; i < arrays.length; ++i) {
+if (arrays[i] == null) {
+numberOfNull++;
+} else {
+notNullArray = arrays[i];

Review Comment:
   looks like it takes into account only last `notNullArray`, is it expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223346027


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) {
 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
 }
 
+/**
+ * Returns an array of the elements in the concat of array1 and array2, 
without duplicates.
+ *
+ * If both of the array are null, the function will return null.
+ */
+public OutType arrayConcat(InType... arrays) {
+arrays = convertToArrays(arrays);
+Expression[] args =
+Stream.concat(
+Stream.of(toExpr()),
+
Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression))
+.toArray(Expression[]::new);
+return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args));
+}
+
+private InType[] convertToArrays(InType[] arrays) {

Review Comment:
   do we really need this method?
   why can not we do it same way like it is done for 
`org.apache.flink.table.api.internal.BaseExpressions#in(InType...)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223340026


##
docs/data/sql_functions.yml:
##
@@ -646,6 +646,9 @@ collection:
   - sql: ARRAY_UNION(array1, array2)
 table: haystack.arrayUnion(array)
 description: Returns an array of the elements in the union of array1 and 
array2, without duplicates. If any of the array is null, the function will 
return null.
+  - sql: ARRAY_CONCAT(array1, array2)
+table: array1.arrayConcat(array2)
+description: Returns an array of the elements in the concat of array1 and 
array2, allow duplicates. If both of the array are null, the function will 
return null.

Review Comment:
   looks like it should be changed accordingly
   since it jira it was suggested to allow var number of input arrays 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223337801


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) {
 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
 }
 
+/**
+ * Returns an array of the elements in the concat of array1 and array2, 
without duplicates.
+ *
+ * If both of the array are null, the function will return null.
+ */
+public OutType arrayConcat(InType... arrays) {
+arrays = convertToArrays(arrays);
+Expression[] args =
+Stream.concat(
+Stream.of(toExpr()),
+
Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression))
+.toArray(Expression[]::new);
+return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args));
+}
+
+private InType[] convertToArrays(InType[] arrays) {
+if (arrays == null || arrays.length < 1) {
+throw new ValidationException("need at least two arrays");

Review Comment:
   I didn't get it...
   for the case of one array `arrays.length == 1` and it will not enter this 
`if`.
   Also having just one array is ok, in that case we can just return it back.
   
   Why are we talking about `"need at least two arrays"` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] architgyl commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-06-08 Thread via GitHub


architgyl commented on code in PR #22509:
URL: https://github.com/apache/flink/pull/22509#discussion_r1223335569


##
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java:
##
@@ -620,4 +626,38 @@ public static YarnConfiguration getYarnConfiguration(
 
 return yarnConfig;
 }
+
+/**
+ * Sets the application ACLs for the given ContainerLaunchContext based on 
the values specified
+ * in the given Flink configuration. Only ApplicationAccessType.VIEW_APP 
and
+ * ApplicationAccessType.MODIFY_APP ACLs are set, and only if they are 
configured in the Flink
+ * configuration. If the viewAcls or modifyAcls string contains the 
WILDCARD_ACL constant, it
+ * will replace the entire string with the WILDCARD_ACL. The resulting map 
is then set as the
+ * application acls for the given container launch context.
+ *
+ * @param amContainer the ContainerLaunchContext to set the ACLs for
+ * @param flinkConfig the Flink configuration to read the ACL values from
+ */
+public static void setAclsFor(
+ContainerLaunchContext amContainer,
+org.apache.flink.configuration.Configuration flinkConfig) {
+Map acls = new HashMap<>();
+String viewAcls = 
flinkConfig.getString(YarnConfigOptions.APPLICATION_VIEW_ACLS, null);
+String modifyAcls = 
flinkConfig.getString(YarnConfigOptions.APPLICATION_MODIFY_ACLS, null);
+if (viewAcls != null) {
+if (viewAcls.contains(WILDCARD_ACL)) {

Review Comment:
   @becketqin have updated the code with handling the wildcard use-case. Can 
you please review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223334014


##
flink-python/pyflink/table/expression.py:
##
@@ -1519,6 +1519,13 @@ def array_union(self, array) -> 'Expression':
 """
 return _binary_op("arrayUnion")(self, array)
 
+def array_concat(self, array) -> 'Expression':
+"""
+Returns an array of the elements in the concat of array1 and array2, 
allow duplicates.

Review Comment:
   what is `array1` and `array2` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r122136


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1407,6 +1409,44 @@ public OutType arrayUnion(InType array) {
 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
 }
 
+/**
+ * Returns an array of the elements in the concat of array1 and array2, 
without duplicates.

Review Comment:
   Does this comment still match the method ?
   I'm asking since it is unclear: what is `array1`, `array2` and why we are 
talking about `without duplicates`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-08 Thread via GitHub


snuyanzin commented on code in PR #22717:
URL: https://github.com/apache/flink/pull/22717#discussion_r1223330360


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */
+@Internal
+public class ArrayConcatFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) 
{
+super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData... arrays) {
+if (arrays == null || arrays.length == 0) {
+return null;
+}
+try {

Review Comment:
   I would suggest to add a shortcut for the case of `arrays.length == 1`. It 
could be easily returned back. Then no need to create a bunch of objects for 
this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #22620: [FLINK-31413][hive] Change scope of flink-table-planner dependency from provided to test in Hive connector

2023-06-08 Thread via GitHub


luoyuxia commented on PR #22620:
URL: https://github.com/apache/flink/pull/22620#issuecomment-1583029682

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #22737: [FLINK-24998] Update CI image

2023-06-08 Thread via GitHub


zentol commented on PR #22737:
URL: https://github.com/apache/flink/pull/22737#issuecomment-1583029547

   Test failure is unrelated; see FLINK-32291.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-08 Thread via GitHub


hanyuzheng7 commented on code in PR #22730:
URL: https://github.com/apache/flink/pull/22730#discussion_r1223301793


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -272,6 +281,61 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_MAX =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_MAX")
+.kind(SCALAR)
+.inputTypeStrategy(
+new InputTypeStrategy() {
+@Override
+public ArgumentCount getArgumentCount() {
+return ConstantArgumentCount.of(1);
+}
+
+@Override
+public Optional> 
inferInputTypes(
+CallContext callContext, boolean 
throwOnFailure) {
+DataType inputDataType =
+
callContext.getArgumentDataTypes().get(0);
+if (!(inputDataType.getLogicalType() 
instanceof ArrayType)) {
+return Optional.empty();
+}
+DataType elementDataType =
+((CollectionDataType) 
inputDataType)
+.getElementDataType();
+if (!Comparable.class.isAssignableFrom(
+
elementDataType.getConversionClass())) {
+return Optional.empty();
+}
+return 
Optional.of(callContext.getArgumentDataTypes());
+}
+
+@Override
+public List getExpectedSignatures(
+FunctionDefinition definition) {
+return Collections.singletonList(
+Signature.of(
+
Signature.Argument.of(">")));
+}
+})
+.outputTypeStrategy(
+new TypeStrategy() {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-29108) Kubernetes operator: Support queryable state

2023-06-08 Thread Salva (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713009#comment-17713009
 ] 

Salva edited comment on FLINK-29108 at 6/8/23 4:29 PM:
---

[~rcroc...@newrelic.com] There is no recording available for your presentation 
right? Queryable State looks like a good idea, I would be interested in keeping 
it alive too for observability purposes. On a related note, some people are 
opting for externalizing the local state instead, see, e.g.:
 - 
[https://engineering.contentsquare.com/2022/flink-external-rocksdb-state-to-aerospike/]
 - [https://www.youtube.com/watch?v=ZWq_TzsXssM 
(FlinkNDB)|https://www.youtube.com/watch?v=ZWq_TzsXssM]

How likely is to add an external state backend to Flink in the short term? In 
the meantime, it would be nice to have some guidelines / recommendations for 
when to rely on Queryable State vs an external/global DB. Also, the current 
situation of Queryable State is a bit confusing probably...


was (Author: JIRAUSER287051):
[~rcroc...@newrelic.com] There is no recording available for your presentation 
right? Queryable State looks like a good idea, I would be interested in keeping 
it alive too for observability purposes. On a related note, some people are 
opting for externalizing the local state instead, see, e.g.:
 - 
[https://engineering.contentsquare.com/2022/flink-external-rocksdb-state-to-aerospike/]
 - [https://www.youtube.com/watch?v=ZWq_TzsXssM 
(FlinkNDB)|https://www.youtube.com/watch?v=ZWq_TzsXssM]

How likely is to add an external state backend to Flink in the short term? In 
the meantime, it would be nice to have some guidelines / recommendations for 
when to rely on Queryable State vs an external/global DB. Also, the current 
situation of Queryable State is a bit confusing for the user and should 
hopefully be clarified soon...

> Kubernetes operator: Support queryable state
> 
>
> Key: FLINK-29108
> URL: https://issues.apache.org/jira/browse/FLINK-29108
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Ron Crocker
>Priority: Minor
>
> Enable the kubernetes operator to deploy jobs where queryable state is 
> desired.
> When queryable state is desired, the operator should configure the deployed 
> job with
>  # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it.
>  # Configure the Queryable State proxy and Queryable State server (via the 
> {{queryable-state.proxy}} and {{queryable-state.server}} configuration 
> sections respectively). If these sections aren't provided, then the default 
> configuration is used.
> The operator will need to create a Kubernetes service fronting the Task 
> Managers {{QueryableStateClientProxy}} port (as configured by the above).
> Tearing down the job also tears down the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] bvarghese1 commented on a diff in pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-08 Thread via GitHub


bvarghese1 commented on code in PR #22730:
URL: https://github.com/apache/flink/pull/22730#discussion_r1223272467


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -272,6 +281,61 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_MAX =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_MAX")
+.kind(SCALAR)
+.inputTypeStrategy(
+new InputTypeStrategy() {
+@Override
+public ArgumentCount getArgumentCount() {
+return ConstantArgumentCount.of(1);
+}
+
+@Override
+public Optional> 
inferInputTypes(
+CallContext callContext, boolean 
throwOnFailure) {
+DataType inputDataType =
+
callContext.getArgumentDataTypes().get(0);
+if (!(inputDataType.getLogicalType() 
instanceof ArrayType)) {
+return Optional.empty();
+}
+DataType elementDataType =
+((CollectionDataType) 
inputDataType)
+.getElementDataType();
+if (!Comparable.class.isAssignableFrom(
+
elementDataType.getConversionClass())) {
+return Optional.empty();
+}
+return 
Optional.of(callContext.getArgumentDataTypes());
+}
+
+@Override
+public List getExpectedSignatures(
+FunctionDefinition definition) {
+return Collections.singletonList(
+Signature.of(
+
Signature.Argument.of(">")));
+}
+})
+.outputTypeStrategy(
+new TypeStrategy() {

Review Comment:
   This should also be a new class `ArrayElementOutputTypeStrategy`. Both of 
these classes would be reused in the ARRAY_MIN implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >