[GitHub] [flink] haishui126 commented on pull request #22925: [FLINK-32496][connectors/common] Fix the bug that source cannot resume after enabling the watermark alignment and idleness

2023-07-05 Thread via GitHub


haishui126 commented on PR #22925:
URL: https://github.com/apache/flink/pull/22925#issuecomment-1623037626

   Thank you for fixing this. I have packaged and tested in my local 
environment, It works well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #22958: [FLINK-32541][network] Fix the buffer leaking when buffer accumulator is released

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on PR #22958:
URL: https://github.com/apache/flink/pull/22958#issuecomment-1623018365

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253951614


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+private final TieredStoragePartitionId partitionId;
+
+private final int subpartitionId;
+
+private final PartitionFileWriter partitionFileWriter;
+
+/**
+ * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+ * buffer is the buffer index.
+ *
+ * Note that this field can be accessed by the task thread or the write 
IO thread, so the
+ * thread safety should be ensured.
+ */
+private final Deque> allBuffers = new 
LinkedList<>();
+
+private CompletableFuture flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+/**
+ * Record the segment id that is writing to.
+ *
+ * Note that when flushing buffers, this can be touched by task thread 
or the flushing
+ * thread, so the thread safety should be ensured.
+ */
+@GuardedBy("allBuffers")
+private int segmentId = -1;
+
+/**
+ * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+ * is added to the {@code allBuffers}, this field is increased by one.
+ *
+ * Note that the field can only be touched by the task thread, so this 
field need not be
+ * guarded by any lock or synchronizations.
+ */
+private int bufferIndex;
+
+public SubpartitionRemoteCacheManager(
+TieredStoragePartitionId partitionId,
+int subpartitionId,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.subpartitionId = subpartitionId;
+this.partitionFileWriter = partitionFileWriter;
+storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+}
+
+// 
+//  Called by RemoteCacheManager
+// 
+
+void startSegment(int segmentId) {
+synchronized (allBuffers) {
+checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+this.segmentId = segmentId;
+}
+}
+
+void addBuffer(Buffer buffer) {
+Tuple2 toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+synchronized (allBuffers) {
+allBuffers.add(toAddBuffer);
+}
+}
+
+void finishSegment(int segmentId) {
+synchronized (allBuffers) {
+checkState(this.segmentId == segmentId, "Wrong segment id.");
+}
+// Flush the buffers belonging to the current segment
+flushBuffers();
+
+ 

[GitHub] [flink] TanYuxin-tyx commented on pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on PR #22851:
URL: https://github.com/apache/flink/pull/22851#issuecomment-1623011886

   @reswqa Thanks for helping review, I have addressed the comments, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253947705


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+private final ExecutorService ioExecutor =
+Executors.newSingleThreadExecutor(
+new ThreadFactoryBuilder()
+.setNameFormat("Hash partition file flush thread")
+
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+.build());
+private final String basePath;
+
+private final WritableByteChannel[] subpartitionChannels;
+
+private volatile boolean isReleased;
+
+SegmentPartitionFileWriter(String basePath, int numSubpartitions) {
+this.basePath = basePath;
+this.subpartitionChannels = new WritableByteChannel[numSubpartitions];
+Arrays.fill(subpartitionChannels, null);
+}
+
+@Override
+public CompletableFuture write(
+TieredStoragePartitionId partitionId, 
List buffersToWrite) {
+List> completableFutures = new ArrayList<>();
+buffersToWrite.forEach(
+subpartitionBuffers -> {
+int subpartitionId = 
subpartitionBuffers.getSubpartitionId();
+List multiSegmentBuffers =
+subpartitionBuffers.getSegmentBufferContexts();
+multiSegmentBuffers.forEach(
+segmentBuffers -> {
+CompletableFuture flushSuccessNotifier =
+new CompletableFuture<>();
+ioExecutor.execute(
+

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253947408


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+private final TieredStoragePartitionId partitionId;
+
+private final int subpartitionId;
+
+private final PartitionFileWriter partitionFileWriter;
+
+/**
+ * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+ * buffer is the buffer index.
+ *
+ * Note that this field can be accessed by the task thread or the write 
IO thread, so the
+ * thread safety should be ensured.
+ */
+private final Deque> allBuffers = new 
LinkedList<>();
+
+private CompletableFuture flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+/**
+ * Record the segment id that is writing to.
+ *
+ * Note that when flushing buffers, this can be touched by task thread 
or the flushing
+ * thread, so the thread safety should be ensured.
+ */
+@GuardedBy("allBuffers")
+private int segmentId = -1;
+
+/**
+ * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+ * is added to the {@code allBuffers}, this field is increased by one.
+ *
+ * Note that the field can only be touched by the task thread, so this 
field need not be
+ * guarded by any lock or synchronizations.
+ */
+private int bufferIndex;
+
+public SubpartitionRemoteCacheManager(
+TieredStoragePartitionId partitionId,
+int subpartitionId,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.subpartitionId = subpartitionId;
+this.partitionFileWriter = partitionFileWriter;
+storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+}
+
+// 
+//  Called by RemoteCacheManager
+// 
+
+void startSegment(int segmentId) {
+synchronized (allBuffers) {
+checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+this.segmentId = segmentId;
+}
+}
+
+void addBuffer(Buffer buffer) {
+Tuple2 toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+synchronized (allBuffers) {
+allBuffers.add(toAddBuffer);
+}
+}
+
+void finishSegment(int segmentId) {
+synchronized (allBuffers) {
+checkState(this.segmentId == segmentId, "Wrong segment id.");

Review Comment:
   Removed the synchronization. Only the task thread can modify the segmentId, 
and the 

[jira] [Created] (FLINK-32547) Add missing doc for Timestamp support in ProtoBuf format

2023-07-05 Thread Benchao Li (Jira)
Benchao Li created FLINK-32547:
--

 Summary: Add missing doc for Timestamp support in ProtoBuf format
 Key: FLINK-32547
 URL: https://issues.apache.org/jira/browse/FLINK-32547
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.1
Reporter: Benchao Li


In FLINK-30093, we have support {{Timestamp}} type, and added the doc for it, 
but missed to updating the English version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253922911


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+private final TieredStoragePartitionId partitionId;
+
+private final int subpartitionId;
+
+private final PartitionFileWriter partitionFileWriter;
+
+/**
+ * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+ * buffer is the buffer index.
+ *
+ * Note that this field can be accessed by the task thread or the write 
IO thread, so the
+ * thread safety should be ensured.
+ */
+private final Deque> allBuffers = new 
LinkedList<>();
+
+private CompletableFuture flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+/**
+ * Record the segment id that is writing to.
+ *
+ * Note that when flushing buffers, this can be touched by task thread 
or the flushing
+ * thread, so the thread safety should be ensured.
+ */
+@GuardedBy("allBuffers")
+private int segmentId = -1;
+
+/**
+ * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+ * is added to the {@code allBuffers}, this field is increased by one.
+ *
+ * Note that the field can only be touched by the task thread, so this 
field need not be
+ * guarded by any lock or synchronizations.
+ */
+private int bufferIndex;
+
+public SubpartitionRemoteCacheManager(
+TieredStoragePartitionId partitionId,
+int subpartitionId,
+TieredStorageMemoryManager storageMemoryManager,
+PartitionFileWriter partitionFileWriter) {
+this.partitionId = partitionId;
+this.subpartitionId = subpartitionId;
+this.partitionFileWriter = partitionFileWriter;
+storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+}
+
+// 
+//  Called by RemoteCacheManager
+// 
+
+void startSegment(int segmentId) {
+synchronized (allBuffers) {
+checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+this.segmentId = segmentId;
+}
+}
+
+void addBuffer(Buffer buffer) {
+Tuple2 toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+synchronized (allBuffers) {
+allBuffers.add(toAddBuffer);
+}
+}
+
+void finishSegment(int segmentId) {
+synchronized (allBuffers) {
+checkState(this.segmentId == segmentId, "Wrong segment id.");
+}
+// Flush the buffers belonging to the current segment
+flushBuffers();
+
+ 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253922689


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFile.java:
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The partition file with segment file mode. In this mode, each segment of 
one subpartition is
+ * written to an independent file.
+ */
+public class SegmentPartitionFile {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SegmentPartitionFile.class);
+
+static final String TIERED_STORAGE_DIR = "tiered-storage";
+
+static final String SEGMENT_FILE_PREFIX = "seg-";
+
+static final String SEGMENT_FINISH_DIR_NAME = "FINISH";
+
+public static SegmentPartitionFileWriter createPartitionFileWriter(
+String dataFilePath, int numSubpartitions) {
+return new SegmentPartitionFileWriter(dataFilePath, numSubpartitions);
+}
+
+public static SegmentPartitionFileReader createPartitionFileReader(String 
dataFilePath) {
+return new SegmentPartitionFileReader(dataFilePath);
+}
+
+// 
+//  File-related utilities
+// 
+
+public static String getTieredStoragePath(String basePath) {
+return String.format("%s/%s", basePath, TIERED_STORAGE_DIR);
+}
+
+public static String getPartitionPath(TieredStoragePartitionId 
partitionId, String basePath) {
+if (basePath == null) {
+return null;
+}
+
+while (basePath.endsWith("/") && basePath.length() > 1) {
+basePath = basePath.substring(0, basePath.length() - 1);
+}
+return String.format("%s/%s", basePath, 
TieredStorageIdMappingUtils.convertId(partitionId));
+}
+
+public static String getSubpartitionPath(
+String basePath, TieredStoragePartitionId partitionId, int 
subpartitionId) {
+while (basePath.endsWith("/") && basePath.length() > 1) {
+basePath = basePath.substring(0, basePath.length() - 1);
+}
+return String.format(
+"%s/%s/%s",
+basePath, TieredStorageIdMappingUtils.convertId(partitionId), 
subpartitionId);
+}
+
+public static Path getSegmentPath(
+String basePath,
+TieredStoragePartitionId partitionId,
+int subpartitionId,
+long segmentId) {
+String subpartitionPath = getSubpartitionPath(basePath, partitionId, 
subpartitionId);
+return new Path(subpartitionPath, SEGMENT_FILE_PREFIX + segmentId);
+}
+
+public static Path getSegmentFinishDirPath(
+String basePath, TieredStoragePartitionId partitionId, int 
subpartitionId) {
+String subpartitionPath = getSubpartitionPath(basePath, partitionId, 
subpartitionId);
+return new Path(subpartitionPath, SEGMENT_FINISH_DIR_NAME);
+}
+
+public static void writeBuffers(
+WritableByteChannel writeChannel, long expectedBytes, ByteBuffer[] 
bufferWithHeaders)
+throws IOException {
+int writeSize = 0;
+for (ByteBuffer bufferWithHeader : bufferWithHeaders) {
+writeSize += writeChannel.write(bufferWithHeader);
+}
+checkState(writeSize == expectedBytes, "Wong number of written 
bytes.");
+}
+
+public static void 

[GitHub] [flink] lincoln-lil commented on a diff in pull request #22894: [FLINK-30598][table-planner] Fix wrong code generated for WatermarkGenerator due to inconsistent source type info when deserial

2023-07-05 Thread via GitHub


lincoln-lil commented on code in PR #22894:
URL: https://github.com/apache/flink/pull/22894#discussion_r1253912150


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java:
##
@@ -483,8 +483,11 @@ private static void validateScanSourceForStreaming(
 ChangelogMode changelogMode,
 ReadableConfig config) {
 // sanity check for produced ChangelogMode
-final boolean hasUpdateBefore = 
changelogMode.contains(RowKind.UPDATE_BEFORE);
-final boolean hasUpdateAfter = 
changelogMode.contains(RowKind.UPDATE_AFTER);
+final boolean hasChangelogMode = changelogMode != null;

Review Comment:
   because the mocked table (`TestDynamicTableFactory.DynamicTableSourceMock`) 
used in test case `DynamicTableSourceSpecSerdeTest` returns null changelog 
mode(may also be the case in user implemented connectors), so the protection 
was added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32464) AssertionError when converting between Table and SQL with selection and type cast

2023-07-05 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-32464.
--
Resolution: Duplicate

Should similar to FLINK-31830.

Feel free to open it if not.

> AssertionError when converting between Table and SQL with selection and type 
> cast
> -
>
> Key: FLINK-32464
> URL: https://issues.apache.org/jira/browse/FLINK-32464
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.16.1
>Reporter: Yunfeng Zhou
>Priority: Major
>
> In an attempt to convert table between Table API and SQL API using the 
> following program
> {code:java}
> public static void main(String[] args) {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table table = tEnv.fromValues(1, 2, 3);
> tEnv.createTemporaryView("input_table", table);
> table = tEnv.sqlQuery("SELECT MAP[f0, 1] AS f1 from input_table");
> table = table.select($("f1").cast(DataTypes.MAP(DataTypes.INT(), 
> DataTypes.INT(;
> tEnv.createTemporaryView("input_table_2", table);
> tEnv.sqlQuery("SELECT * from input_table_2");
> }
> {code}
> The following exception is thrown.
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType((INTEGER, INTEGER) MAP NOT NULL f1-MAP) NOT NULL
> converted type:
> RecordType((INTEGER, INTEGER) MAP f1-MAP) NOT NULL
> rel:
> LogicalProject(f1-MAP=[CAST(MAP($0, 1)):(INTEGER, INTEGER) MAP])
>   LogicalValues(tuples=[[{ 1 }, { 2 }, { 3 }]])
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:470)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:215)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:191)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1498)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1253)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:374)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
>   at org.apache.flink.streaming.connectors.redis.RedisSinkITCase.main
> {code}
> It seems that there is a bug with the Table-SQL conversion and selection 
> process when type cast is involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


reswqa commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253009389


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##
@@ -30,18 +32,38 @@ public class TieredStorageConfiguration {
 // TODO, after implementing the tier factory, add appreciate 
implementations to the array.
 private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = 
new TierFactory[0];
 
+/** If the remote storage tier is not used, this field may be null. */
+@Nullable private final String remoteStorageBasePath;
+
+public TieredStorageConfiguration(String remoteStorageBasePath) {

Review Comment:
   ```suggestion
   public TieredStorageConfiguration(@Nullable String 
remoteStorageBasePath) {
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##
@@ -25,11 +25,9 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 
-/** Utils for reading or writing from tiered storage. */
+/** Utils for reading or writing to tiered storage. */

Review Comment:
   ```suggestion
   /** Utils for reading from or writing to tiered storage. */
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+private final ExecutorService ioExecutor =
+Executors.newSingleThreadExecutor(
+new ThreadFactoryBuilder()
+.setNameFormat("Hash partition file flush thread")

Review Comment:
   ```suggestion
   .setNameFormat("Segment partition file flush 

[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22937: [FLINK-32428] Introduce base interfaces for CatalogStore

2023-07-05 Thread via GitHub


ruanhang1993 commented on code in PR #22937:
URL: https://github.com/apache/flink/pull/22937#discussion_r1253856457


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactoryTest.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.DefaultCatalogStoreContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link GenericInMemoryCatalogFactoryTest}. */

Review Comment:
   Test for {@link GenericInMemoryCatalogFactory}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22922: [FLINK-32256][table] Add built-in ARRAY_ MIN function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22922:
URL: https://github.com/apache/flink/pull/22922#issuecomment-1622901153

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #22894: [FLINK-30598][table-planner] Fix wrong code generated for WatermarkGenerator due to inconsistent source type info when deseriali

2023-07-05 Thread via GitHub


swuferhong commented on code in PR #22894:
URL: https://github.com/apache/flink/pull/22894#discussion_r1253845465


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java:
##
@@ -483,8 +483,11 @@ private static void validateScanSourceForStreaming(
 ChangelogMode changelogMode,
 ReadableConfig config) {
 // sanity check for produced ChangelogMode
-final boolean hasUpdateBefore = 
changelogMode.contains(RowKind.UPDATE_BEFORE);
-final boolean hasUpdateAfter = 
changelogMode.contains(RowKind.UPDATE_AFTER);
+final boolean hasChangelogMode = changelogMode != null;

Review Comment:
   Checking why `changlegMode == null` of the failed case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30238) Unified Sink committer does not clean up state on final savepoint

2023-07-05 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740371#comment-17740371
 ] 

Yun Gao commented on FLINK-30238:
-

Hi [~jingge]  may I have a double confirmation on the detailed issue now? Since 
in the above there seems some divergence on the cause of this issue. 

> Unified Sink committer does not clean up state on final savepoint
> -
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Fabian Paul
>Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dianfu commented on pull request #22957: [FLINK-32544][python] Fix test_java_sql_ddl to works in JDK17

2023-07-05 Thread via GitHub


dianfu commented on PR #22957:
URL: https://github.com/apache/flink/pull/22957#issuecomment-1622826150

   @zentol Great . Updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #22918: [FLINK-32501][table-planner] Fix wrong plan of a proctime window aggregation generated due to incorrect cost evaluation

2023-07-05 Thread via GitHub


swuferhong commented on code in PR #22918:
URL: https://github.com/apache/flink/pull/22918#discussion_r1253827159


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala:
##
@@ -256,6 +257,13 @@ class FlinkRelMdSelectivity private extends 
MetadataHandler[BuiltInMetadata.Sele
 }
   }
 
+  def getSelectivity(
+  rel: CommonPhysicalWindowTableFunction,
+  mq: RelMetadataQuery,
+  predicate: RexNode): JDouble = {
+estimateSelectivity(rel, mq, predicate)
+  }
+

Review Comment:
   Although this method `getSelectivity(rel: RelNode, mq: RelMetadataQuery, 
predicate: RexNode)` is only effective for `BatchPhysicalRel`, I think most of 
the methods are actually effective for stream operators. Can we remove this 
limitation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #22918: [FLINK-32501][table-planner] Fix wrong plan of a proctime window aggregation generated due to incorrect cost evaluation

2023-07-05 Thread via GitHub


swuferhong commented on code in PR #22918:
URL: https://github.com/apache/flink/pull/22918#discussion_r1253827159


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivity.scala:
##
@@ -256,6 +257,13 @@ class FlinkRelMdSelectivity private extends 
MetadataHandler[BuiltInMetadata.Sele
 }
   }
 
+  def getSelectivity(
+  rel: CommonPhysicalWindowTableFunction,
+  mq: RelMetadataQuery,
+  predicate: RexNode): JDouble = {
+estimateSelectivity(rel, mq, predicate)
+  }
+

Review Comment:
   Although this method `getSelectivity(rel: RelNode, mq: RelMetadataQuery, 
predicate: RexNode)` is only effective for `BatchPhysicalRel`, I think most of 
the methods are actually effective for stream operators. Can we remove this 
limitation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32502) Remove AbstractLeaderElectionService

2023-07-05 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740367#comment-17740367
 ] 

Wencong Liu commented on FLINK-32502:
-

Hello [~mapohl]  Are you suggesting merging the methods of 
AbstractLeaderElectionService to the LeaderElectionService interface? I would 
like to address this issue. 

> Remove AbstractLeaderElectionService
> 
>
> Key: FLINK-32502
> URL: https://issues.apache.org/jira/browse/FLINK-32502
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Priority: Major
>
> {{AbstractLeaderElectionService}} doesn't bring much value anymore and can be 
> removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hackergin commented on pull request #22937: [FLINK-32428] Introduce base interfaces for CatalogStore

2023-07-05 Thread via GitHub


hackergin commented on PR #22937:
URL: https://github.com/apache/flink/pull/22937#issuecomment-1622791724

   @ruanhang1993  Thank you very much for your thoughtful and detailed 
comments.  I update the code, I will pay more attention to annotations and 
documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261][table] Add built-in MAP_UNION function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1622756809

   @snuyanzin do you know why Test - table report error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261][table] Add built-in MAP_UNION function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1622492472

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1622483121

   @snuyanzin solve all comments now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253617456


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -74,4 +74,32 @@ public 
ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
 throw new FlinkRuntimeException(t);
 }
 }
+
+public @Nullable ArrayData eval(ArrayData array, Integer start) {
+try {
+if (array == null || start == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+int startIndex = start;
+int endIndex = array.size();
+startIndex += startIndex < 0 ? array.size() + 1 : 0;
+startIndex = Math.max(1, startIndex);
+if (endIndex < startIndex) {
+return new GenericArrayData(new Object[0]);
+}
+if (startIndex == 1 && endIndex == array.size()) {
+return array;
+}
+List slicedArray = new ArrayList<>();
+for (int i = startIndex - 1; i <= endIndex - 1; i++) {
+slicedArray.add(elementGetter.getElementOrNull(array, i));
+}
+return new GenericArrayData(slicedArray.toArray());
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}

Review Comment:
   so you mean like this, at first I write like this, but I through we can 
concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253620252


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer... 
end) {
+try {
+if (array == null || start == null || end.length > 0 && end[0] == 
null) {

Review Comment:
   Ok, I saw, you use another function to call existed one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253620252


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer... 
end) {
+try {
+if (array == null || start == null || end.length > 0 && end[0] == 
null) {

Review Comment:
   I will change back to previous one, but I think use vargs can use only one 
function to achieve it. If you don't mind, can you tell me the benefit of using 
two functions here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253620252


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer... 
end) {
+try {
+if (array == null || start == null || end.length > 0 && end[0] == 
null) {

Review Comment:
   I will change back two previous one, but I think use vargs can use only one 
function to achieve it. If you don't mind, can you tell me the benefit of using 
two functions here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253617904


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer... 
end) {
+try {
+if (array == null || start == null || end.length > 0 && end[0] == 
null) {

Review Comment:
   My pervious commit achieve as you suggestion.
   
https://github.com/apache/flink/pull/22834/commits/3752db452c15fcfbe2c3e4aad775d74d4bcadc8d#r1253617456



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253617456


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -74,4 +74,32 @@ public 
ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
 throw new FlinkRuntimeException(t);
 }
 }
+
+public @Nullable ArrayData eval(ArrayData array, Integer start) {
+try {
+if (array == null || start == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+int startIndex = start;
+int endIndex = array.size();
+startIndex += startIndex < 0 ? array.size() + 1 : 0;
+startIndex = Math.max(1, startIndex);
+if (endIndex < startIndex) {
+return new GenericArrayData(new Object[0]);
+}
+if (startIndex == 1 && endIndex == array.size()) {
+return array;
+}
+List slicedArray = new ArrayList<>();
+for (int i = startIndex - 1; i <= endIndex - 1; i++) {
+slicedArray.add(elementGetter.getElementOrNull(array, i));
+}
+return new GenericArrayData(slicedArray.toArray());
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}

Review Comment:
   so you mean like this, at first I write like this, but I through we can 
concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] tweise commented on a diff in pull request #22810: [FLINK-32035][sql-client] Support HTTPS endpoints in SQL Client gateway mode

2023-07-05 Thread via GitHub


tweise commented on code in PR #22810:
URL: https://github.com/apache/flink/pull/22810#discussion_r1253612690


##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##
@@ -84,6 +89,26 @@ public void testConnectionTimeout() throws Exception {
 }
 }
 
+@Test
+public void testHttpsConnectionWithDefaultCerts() throws Exception {
+final Configuration config = new Configuration();
+final URL httpsUrl = new URL("https://raw.githubusercontent.com;);

Review Comment:
   Dependency on that endpoint is not ideal for a unit test. However, given 
that the CI setup already won't function w/o that connection and the difficulty 
to test this specific functionality (HTTPS with default keystore), I think this 
exception is justifiable. I would suggest though to add a comment though for 
the unlikely case that someone runs tests locally and is not connected to the 
internet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


snuyanzin commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253603314


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer... 
end) {
+try {
+if (array == null || start == null || end.length > 0 && end[0] == 
null) {

Review Comment:
   I guess it's possible to use another method and then no need for varags
   ```suggestion
   public @Nullable ArrayData eval(ArrayData array, Integer start) {
   return array == null ? null : eval(array, start, array.size());
   }
   
   public @Nullable ArrayData eval(ArrayData array, Integer start, Integer 
end) {
   try {
   if (array == null || start == null || end == null) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


snuyanzin commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253603314


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySliceFunction.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_SLICE}. */
+@Internal
+public class ArraySliceFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+
+public ArraySliceFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SLICE, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+}
+
+public @Nullable ArrayData eval(ArrayData array, Integer start, Integer... 
end) {
+try {
+if (array == null || start == null || end.length > 0 && end[0] == 
null) {

Review Comment:
   I guess it's possible to use another method and then no need for varags
   ```suggestion
   public @Nullable ArrayData eval(ArrayData array, Integer start) {
   return eval(array, start, array == null ? null : array.size());
   }
   
   public @Nullable ArrayData eval(ArrayData array, Integer start, Integer 
end) {
   try {
   if (array == null || start == null || end == null) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] elkhand commented on a diff in pull request #22816: [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests

2023-07-05 Thread via GitHub


elkhand commented on code in PR #22816:
URL: https://github.com/apache/flink/pull/22816#discussion_r1253525351


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##
@@ -577,4 +593,29 @@ private void closeSession() throws SqlExecutionException {
 // ignore any throwable to keep the cleanup running
 }
 }
+
+private static Collection 
readHeadersFromEnvironmentVariable(String envVarName) {
+List headers = new ArrayList<>();
+String rawHeaders = System.getenv(envVarName);
+
+if (rawHeaders != null) {
+String[] lines = rawHeaders.split("\n");

Review Comment:
   HTTP header-wise it seems ok, I thought it will be challenging for 
specifying the env variable with multi-line values (each separated with a new 
line. this is just a nit comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22745: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

2023-07-05 Thread via GitHub


snuyanzin commented on code in PR #22745:
URL: https://github.com/apache/flink/pull/22745#discussion_r1253446621


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromEntriesFunction.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}. */
+@Internal
+public class MapFromEntriesFunction extends BuiltInScalarFunction {
+private final RowData.FieldGetter[] fieldGetters;
+
+public MapFromEntriesFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.MAP_FROM_ENTRIES, context);
+RowType rowType =
+(RowType)
+((CollectionDataType)
+
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.getLogicalType();
+fieldGetters =
+IntStream.range(0, rowType.getFieldCount())
+.mapToObj(i -> 
RowData.createFieldGetter(rowType.getTypeAt(i), i))
+.toArray(RowData.FieldGetter[]::new);
+}
+
+public @Nullable MapData eval(@Nullable ArrayData input) {
+if (input == null) {
+return null;
+}
+
+int size = input.size();
+Map map = new HashMap<>();
+for (int pos = 0; pos < size; pos++) {
+if (input.isNullAt(pos)) {
+return null;
+}
+
+RowData rowData = input.getRow(pos, 2);
+final Object key = fieldGetters[0].getFieldOrNull(rowData);
+final Object value = fieldGetters[1].getFieldOrNull(rowData);
+map.put(key, value);

Review Comment:
   sorry probably i missed during previous review iterations
   however we can not rely on java's equals since it's not 100% identical with 
SQL
   and input might be generic data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22745: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

2023-07-05 Thread via GitHub


snuyanzin commented on code in PR #22745:
URL: https://github.com/apache/flink/pull/22745#discussion_r1253446621


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromEntriesFunction.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}. */
+@Internal
+public class MapFromEntriesFunction extends BuiltInScalarFunction {
+private final RowData.FieldGetter[] fieldGetters;
+
+public MapFromEntriesFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.MAP_FROM_ENTRIES, context);
+RowType rowType =
+(RowType)
+((CollectionDataType)
+
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.getLogicalType();
+fieldGetters =
+IntStream.range(0, rowType.getFieldCount())
+.mapToObj(i -> 
RowData.createFieldGetter(rowType.getTypeAt(i), i))
+.toArray(RowData.FieldGetter[]::new);
+}
+
+public @Nullable MapData eval(@Nullable ArrayData input) {
+if (input == null) {
+return null;
+}
+
+int size = input.size();
+Map map = new HashMap<>();
+for (int pos = 0; pos < size; pos++) {
+if (input.isNullAt(pos)) {
+return null;
+}
+
+RowData rowData = input.getRow(pos, 2);
+final Object key = fieldGetters[0].getFieldOrNull(rowData);
+final Object value = fieldGetters[1].getFieldOrNull(rowData);
+map.put(key, value);

Review Comment:
   sorry probably i missed during previous review iterations
   however we can not rely on java's eqauls since it's not 100% identical with 
SQL



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromEntriesFunction.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+

[jira] [Created] (FLINK-32546) update Code Style Guide with Java properties naming convention

2023-07-05 Thread Jing Ge (Jira)
Jing Ge created FLINK-32546:
---

 Summary: update Code Style Guide with Java properties naming 
convention
 Key: FLINK-32546
 URL: https://issues.apache.org/jira/browse/FLINK-32546
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Jing Ge


The class [properties|https://en.wikipedia.org/wiki/Property_(programming)] 
must be accessible using {_}get{_}, {_}set{_}, _is_ (can be used for boolean 
properties instead of get), _to_ and other methods (so-called [accessor 
methods|https://en.wikipedia.org/wiki/Accessor] and [mutator 
methods|https://en.wikipedia.org/wiki/Mutator_method]) according to a standard 
[naming 
convention|https://en.wikipedia.org/wiki/Naming_conventions_(programming)]. 

 

[https://en.wikipedia.org/wiki/JavaBeans]

[https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261][table] Add built-in MAP_UNION function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1622117991

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22922: [FLINK-32256][table] Add built-in ARRAY_ MIN function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22922:
URL: https://github.com/apache/flink/pull/22922#issuecomment-1622101928

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22893: [FLINK-31814][runtime] Enables Precondition that checks that the issuedLeaderSessionID is set on revoke processing

2023-07-05 Thread via GitHub


XComp commented on code in PR #22893:
URL: https://github.com/apache/flink/pull/22893#discussion_r1253347147


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##
@@ -247,38 +247,33 @@ void 
testDelayedGrantCallAfterContenderBeingDeregisteredAgain() throws Exception
 };
 }
 
-/**
- * Test to cover the issue described in FLINK-31814. This test could be 
removed after
- * FLINK-31814 is resolved.
- */
 @Test
-void testOnRevokeCallWhileClosingService() throws Exception {
-final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
-new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
-LeaderElectionEventHandler::onRevokeLeadership);
-
-try (final DefaultLeaderElectionService testInstance =
-new DefaultLeaderElectionService(
-driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler())) {
-testInstance.startLeaderElectionBackend();
-
-final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
-assertThat(driver).isNotNull();
-
-driver.isLeader();
-
-final LeaderElection leaderElection =
-
testInstance.createLeaderElection(createRandomContenderID());
-final TestingContender contender =
-new TestingContender("unused-address", leaderElection);
-contender.startLeaderElection();
+void testDelayedRevokeCallAfterContenderBeingDeregisteredAgain() throws 
Exception {

Review Comment:
   fair enough - I added proper asserts that show that the revocation is only 
called once. The exception would have happened in the final trigger call 
because the `issuedLeaderSessionID` would have been null'd by the close call 
already. I didn't add this as a comment because it doesn't add much value to 
the test and was more of an issue due to the previously immature state of the 
code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22842: [FLINK-32261][table] Add built-in MAP_UNION function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22842:
URL: https://github.com/apache/flink/pull/22842#issuecomment-1622069560

   @snuyanzin , do you think the code is ok now? If ok, I can remove 
CommonMapInputTypeStrategy and CommonArrayInputTypeStrategy these two Class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31476) AdaptiveScheduler should take lower bound parallelism settings into account

2023-07-05 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-31476.

Fix Version/s: 1.18.0
   Resolution: Fixed

master: 38f4d133a784a58318f6b6cb9617646ab7a70fa1

> AdaptiveScheduler should take lower bound parallelism settings into account
> ---
>
> Key: FLINK-31476
> URL: https://issues.apache.org/jira/browse/FLINK-31476
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: David Morávek
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #22883: [FLINK-31476] AdaptiveScheduler respects minimum parallelism

2023-07-05 Thread via GitHub


zentol merged PR #22883:
URL: https://github.com/apache/flink/pull/22883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32421) EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown is not properly implemented

2023-07-05 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-32421.
---
Fix Version/s: 1.18.0
   1.16.3
   1.17.2
 Assignee: Matthias Pohl
   Resolution: Fixed

master: 5dbbc695ed90241bc22d01d05f19e2bdfb4b1e0b
1.17: 067df7b9ce14067375adffcc05bab5513592d646
1.16: 56cb5442333413e7617b12e5796c806d64de62e9

> EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown is not 
> properly implemented
> ---
>
> Key: FLINK-32421
> URL: https://issues.apache.org/jira/browse/FLINK-32421
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> The purpose of 
> {{EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown}} is to 
> check that there is no {{NullPointerException}} happening if the event 
> processing happens after the shutdown of the {{EmbeddedExecutorService}} (see 
> FLINK-11855).
> But the concurrent execution is not handled properly. The test also succeeds 
> if the close call happened before the shutdown (due to the multi-threaded 
> nature of the test) which leaves us without the actual test scenario being 
> tested.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp merged pull request #22872: [BP-1.16][FLINK-32421][test] EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown is not properly implemented

2023-07-05 Thread via GitHub


XComp merged PR #22872:
URL: https://github.com/apache/flink/pull/22872


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22872: [BP-1.16][FLINK-32421][test] EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown is not properly implemented

2023-07-05 Thread via GitHub


XComp commented on PR #22872:
URL: https://github.com/apache/flink/pull/22872#issuecomment-1622039161

   test failure happened in connect module which doesn't include this change. 
Merging anyway


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp merged pull request #22871: [BP-1.17][FLINK-32421][test] EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown is not properly implemented

2023-07-05 Thread via GitHub


XComp merged PR #22871:
URL: https://github.com/apache/flink/pull/22871


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp merged pull request #22865: [FLINK-32421][test] EmbeddedLeaderServiceTest.testConcurrentRevokeLeadershipAndShutdown is not properly implemented

2023-07-05 Thread via GitHub


XComp merged PR #22865:
URL: https://github.com/apache/flink/pull/22865


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on a diff in pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on code in PR #22834:
URL: https://github.com/apache/flink/pull/22834#discussion_r1253311533


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -1050,4 +1051,209 @@ public static class CreateEmptyArray extends 
ScalarFunction {
 return new int[] {};
 }
 }
+
+private Stream arraySliceTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_SLICE)
+.onFieldsWithData(
+new Integer[] {null, 1, 2, 3, 4, 5, 6, null},
+null,
+new Row[] {
+Row.of(true, LocalDate.of(2022, 4, 20)),
+Row.of(true, LocalDate.of(1990, 10, 14)),
+null
+},
+new String[] {"a", "b", "c", "d", "e"},
+new Integer[] {1, 2, 3, 4, 5})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+DataTypes.ARRAY(DataTypes.STRING()),
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-123, -231),
+"ARRAY_SLICE(f4, -123, -231)",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-5, -5),
+"ARRAY_SLICE(f4, -5, -5)",
+new Integer[] {1},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-6, -5),
+"ARRAY_SLICE(f4, -6, -5)",
+new Integer[] {1},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(5, 5),
+"ARRAY_SLICE(f4, 5, 5)",
+new Integer[] {5},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(5, 6),
+"ARRAY_SLICE(f4, 5, 6)",
+new Integer[] {5},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-5, 0),
+"ARRAY_SLICE(f4, -5, 0)",
+new Integer[] {1},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-5, -8),
+"ARRAY_SLICE(f4, -5, -8)",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-20, -8),
+"ARRAY_SLICE(f4, -20, -8)",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(20, 30),
+"ARRAY_SLICE(f4, 20, 30)",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(-123, 123),
+"ARRAY_SLICE(f4, -123, 123)",
+new Integer[] {1, 2, 3, 4, 5},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(3, -7),
+"ARRAY_SLICE(f4, 3, -7)",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f4").arraySlice(0, -7),
+"ARRAY_SLICE(f4, 0, -7)",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+  

[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1622035418

   @snuyanzin it support third args optional now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32537) Add compatibility annotation for REST API classes

2023-07-05 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740266#comment-17740266
 ] 

Hong Liang Teoh edited comment on FLINK-32537 at 7/5/23 3:42 PM:
-

> We already know that though due to the compatibility tests.

 

Got it. [~chesnay] Is the proposal to close this Jira and leave as-is?


was (Author: JIRAUSER292614):
> We already know that though due to the compatibility tests.

 

Got it. [~chesnay] Is the proposal to ignore the `@Internal` or `@Public` for 
REST API classes?

> Add compatibility annotation for REST API classes
> -
>
> Key: FLINK-32537
> URL: https://issues.apache.org/jira/browse/FLINK-32537
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / REST
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Hong Liang Teoh
>Priority: Minor
> Fix For: 1.18.0
>
>
> *Why*
> We want to standardise the class labelling for Flink classes. Currently, the 
> compatibility annotations like @Public, @PublicEvolving, @Internal are not 
> present for REST API classes.
>  
> *What*
> We should be added @Internal for most Flink classes, unless they change the 
> REST API variables, so we know clearly which components will change our REST 
> API when changed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32537) Add compatibility annotation for REST API classes

2023-07-05 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740266#comment-17740266
 ] 

Hong Liang Teoh commented on FLINK-32537:
-

> We already know that though due to the compatibility tests.

 

Got it. [~chesnay] Is the proposal to ignore the `@Internal` or `@Public` for 
REST API classes?

> Add compatibility annotation for REST API classes
> -
>
> Key: FLINK-32537
> URL: https://issues.apache.org/jira/browse/FLINK-32537
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / REST
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Hong Liang Teoh
>Priority: Minor
> Fix For: 1.18.0
>
>
> *Why*
> We want to standardise the class labelling for Flink classes. Currently, the 
> compatibility annotations like @Public, @PublicEvolving, @Internal are not 
> present for REST API classes.
>  
> *What*
> We should be added @Internal for most Flink classes, unless they change the 
> REST API variables, so we know clearly which components will change our REST 
> API when changed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


snuyanzin commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1621998471

   true, probably missed it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #22745: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

2023-07-05 Thread via GitHub


liuyongvs commented on PR #22745:
URL: https://github.com/apache/flink/pull/22745#issuecomment-1621991941

   This function have  i supported in calcite 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #22745: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

2023-07-05 Thread via GitHub


liuyongvs commented on PR #22745:
URL: https://github.com/apache/flink/pull/22745#issuecomment-1621990748

   Hi @snuyanzin will you help look again?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1621967689

   @snuyanzin can you help me polish the description?
   `Returns a subarray of the input array between 'start_offset' and 
'end_offset' inclusive. The 'end_offset' is an optional one. The offsets are 
1-based however 0 is also treated as the beginning of the array. Positive 
values are counted from the beginning of the array while negative from the end. 
If 'start_offset' is after 'end_offset' or both are out of array bounds an 
empty array will be returned. 
   `
   Returns null if any input is null.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


snuyanzin commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1621965984

   >so third arg optional is a specific situation offset-end set right bound of 
array.
   > and start-offset behavior not change?
   
   correct
   in this situation
   ```
   array_slice(ARRAY[1,2,3] , -5) -> [1,2,3]  <-- same as 
array_slice(ARRAY[1,2,3] , -5, 3)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1621958121

   > if the third is optional the result should be same as the third arg is 
equal to length of the array (same as in case of `substring`) the result should 
be
   > 
   > ```
   > array_slice(ARRAY[1,2,3] , -3) -> [1,2,3]<-- here yes, correct, it 
is same as  array_slice(ARRAY[1,2,3] , -3)
   > array_slice(ARRAY[1,2,3] , 1) -> [1, 2, 3]   <-- here it is same as 
array_slice(ARRAY[1,2,3] , 1, 3)
   > and array_slice(ARRAY[1,2,3], 4) -> []   <-- same as 
array_slice(ARRAY[1,2,3], 4, 3) -> []
   > ```
   
   
   
   -3 stand for from end of array to start of array so it stand from 1, how 
about this situation
   ` array_slice(ARRAY[1,2,3] , -5) -> [1,2,3] ?`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #22816: [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests

2023-07-05 Thread via GitHub


afedulov commented on code in PR #22816:
URL: https://github.com/apache/flink/pull/22816#discussion_r1253253546


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##
@@ -411,30 +413,31 @@ private static Request createRequest(
 String targetUrl,
 HttpMethod httpMethod,
 ByteBuf jsonPayload,
-Collection fileUploads)
+Collection fileUploads,
+Collection customHeaders)
 throws IOException {
 if (fileUploads.isEmpty()) {
 
 HttpRequest httpRequest =
 new DefaultFullHttpRequest(
 HttpVersion.HTTP_1_1, httpMethod, targetUrl, 
jsonPayload);
 
-httpRequest
-.headers()
-.set(HttpHeaderNames.HOST, targetAddress)
+HttpHeaders headers = httpRequest.headers();
+headers.set(HttpHeaderNames.HOST, targetAddress)
 .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
 .add(HttpHeaderNames.CONTENT_LENGTH, 
jsonPayload.capacity())
 .add(HttpHeaderNames.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE);
+customHeaders.forEach(ch -> headers.add(ch.getName(), 
ch.getValue()));
 
 return new SimpleRequest(httpRequest);
 } else {
 HttpRequest httpRequest =
 new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
httpMethod, targetUrl);
 
-httpRequest
-.headers()
-.set(HttpHeaderNames.HOST, targetAddress)
+HttpHeaders headers = httpRequest.headers();
+headers.set(HttpHeaderNames.HOST, targetAddress)
 .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
+customHeaders.forEach(ch -> headers.set(ch.getName(), 
ch.getValue()));

Review Comment:
   addressed 



##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.util.Objects;
+
+/** Represents an HTTP header with a name and a value. */
+public class HttpHeader {
+
+/** The name of the HTTP header. */
+private final String name;
+
+/** The value of the HTTP header. */
+private final String value;
+
+/**
+ * Constructs an {@code HttpHeader} object with the specified name and 
value.
+ *
+ * @param name the name of the HTTP header
+ * @param value the value of the HTTP header
+ */
+public HttpHeader(String name, String value) {
+this.name = name;
+this.value = value;
+}
+
+/**
+ * Returns the name of this HTTP header.
+ *
+ * @return the name of this HTTP header
+ */
+public String getName() {
+return name;
+}
+
+/**
+ * Returns the value of this HTTP header.
+ *
+ * @return the value of this HTTP header
+ */
+public String getValue() {
+return value;
+}
+
+@Override
+public String toString() {
+return "HttpHeader{" + "name='" + name + '\'' + ", value='" + value + 
'\'' + '}';
+}
+
+@Override
+public boolean equals(Object o) {

Review Comment:
   addressed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32353) Make Cassandra connector tests compatible with archunit rules

2023-07-05 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740251#comment-17740251
 ] 

Etienne Chauchot edited comment on FLINK-32353 at 7/5/23 3:05 PM:
--

The mini cluster rule has been fixed in  [Flink 
1.18|https://github.com/apache/flink/pull/22399/]
 * CassandraConnectorITCase does not use/needs MiniCluster so it is still an 
"violation" but the rule message has changed.
 * the "violation" for CassandraSourceITCase is now gone as the rule was fixed

To get rid of this failure when testing against flink 1.18, we need to update 
the archunit violation store. But as part of [this 
email|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] , I'll 
update the violation store only when flink 1.18 is the main supported version 
for the Cassandra connector, surely short after this flink release is out.


was (Author: echauchot):
Regarding [this 
email|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] , I'll 
add the "normal" archunit violations that comply to 1.18 when this version is 
main supported version for the Cassandra connector.

> Make Cassandra connector tests compatible with archunit rules
> -
>
> Key: FLINK-32353
> URL: https://issues.apache.org/jira/browse/FLINK-32353
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Cassandra
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> The current Cassandra connector in {{main}} fails when testing against Flink 
> 1.18-SNAPSHOT
> {code:java}
> Error:  Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 8.1 s 
> <<< FAILURE! - in org.apache.flink.architecture.rules.ITCaseRules
> Error:  ITCaseRules.ITCASE_USE_MINICLUSTER  Time elapsed: 0.025 s  <<< 
> FAILURE!
> java.lang.AssertionError: 
> Architecture Violation [Priority: MEDIUM] - Rule 'ITCASE tests should use a 
> MiniCluster resource or extension' was violated (1 times):
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does 
> not satisfy: only one of the following predicates match:
> * reside in a package 'org.apache.flink.runtime.*' and contain any fields 
> that are static, final, and of type InternalMiniClusterExtension and 
> annotated with @RegisterExtension
> * reside outside of package 'org.apache.flink.runtime.*' and contain any 
> fields that are static, final, and of type MiniClusterExtension and annotated 
> with @RegisterExtension or are , and of type MiniClusterTestEnvironment and 
> annotated with @TestEnv
> * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
> @ExtendWith with class InternalMiniClusterExtension
> * reside outside of package 'org.apache.flink.runtime.*' and is annotated 
> with @ExtendWith with class MiniClusterExtension
>  or contain any fields that are public, static, and of type 
> MiniClusterWithClientResource and final and annotated with @ClassRule or 
> contain any fields that is of type MiniClusterWithClientResource and public 
> and final and not static and annotated with @Rule
> {code}
> https://github.com/apache/flink-connector-cassandra/actions/runs/5276835802/jobs/9544092571#step:13:811



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


snuyanzin commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1621949525

   if the third is optional the result should be same as the third arg is equal 
to length of the array (same as in case of `substring`)
   the result should be 
   ```
   array_slice(ARRAY[1,2,3] , -3) -> [1,2,3]<-- here yes, correct, it 
is same as  array_slice(ARRAY[1,2,3] , -3)
   array_slice(ARRAY[1,2,3] , 1) -> [1, 2, 3]   <-- here it is same as 
array_slice(ARRAY[1,2,3] , 1, 3)
   and array_slice(ARRAY[1,2,3], 4) -> []   <-- same as 
array_slice(ARRAY[1,2,3], 4, 3) -> []
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32353) Make Cassandra connector tests compatible with archunit rules

2023-07-05 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740251#comment-17740251
 ] 

Etienne Chauchot commented on FLINK-32353:
--

Regarding [this 
email|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] , I'll 
add the "normal" archunit violations that comply to 1.18 when this version is 
main supported version for the Cassandra connector.

> Make Cassandra connector tests compatible with archunit rules
> -
>
> Key: FLINK-32353
> URL: https://issues.apache.org/jira/browse/FLINK-32353
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Cassandra
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> The current Cassandra connector in {{main}} fails when testing against Flink 
> 1.18-SNAPSHOT
> {code:java}
> Error:  Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 8.1 s 
> <<< FAILURE! - in org.apache.flink.architecture.rules.ITCaseRules
> Error:  ITCaseRules.ITCASE_USE_MINICLUSTER  Time elapsed: 0.025 s  <<< 
> FAILURE!
> java.lang.AssertionError: 
> Architecture Violation [Priority: MEDIUM] - Rule 'ITCASE tests should use a 
> MiniCluster resource or extension' was violated (1 times):
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does 
> not satisfy: only one of the following predicates match:
> * reside in a package 'org.apache.flink.runtime.*' and contain any fields 
> that are static, final, and of type InternalMiniClusterExtension and 
> annotated with @RegisterExtension
> * reside outside of package 'org.apache.flink.runtime.*' and contain any 
> fields that are static, final, and of type MiniClusterExtension and annotated 
> with @RegisterExtension or are , and of type MiniClusterTestEnvironment and 
> annotated with @TestEnv
> * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
> @ExtendWith with class InternalMiniClusterExtension
> * reside outside of package 'org.apache.flink.runtime.*' and is annotated 
> with @ExtendWith with class MiniClusterExtension
>  or contain any fields that are public, static, and of type 
> MiniClusterWithClientResource and final and annotated with @ClassRule or 
> contain any fields that is of type MiniClusterWithClientResource and public 
> and final and not static and annotated with @Rule
> {code}
> https://github.com/apache/flink-connector-cassandra/actions/runs/5276835802/jobs/9544092571#step:13:811



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hanyuzheng7 commented on pull request #22834: [FLINK-32260][table] Add built-in ARRAY_SLICE function.

2023-07-05 Thread via GitHub


hanyuzheng7 commented on PR #22834:
URL: https://github.com/apache/flink/pull/22834#issuecomment-1621939482

   @snuyanzin if third optional,  can you tell me the specific function 
behavior at this time?
   ```
   array_slice(ARRAY[1,2,3] , -3) -> [1,2,3] 
   array_slice(ARRAY[1,2,3] , 1) -> [1]
   and array_slice(ARRAY[1,2,3], 4) -> []
   
   ```
   
   whether the result like what I show above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22873: [FLINK-32419][runtime] Removal of obsolete/unused classes/interfaces/fields

2023-07-05 Thread via GitHub


XComp commented on PR #22873:
URL: https://github.com/apache/flink/pull/22873#issuecomment-1621920280

   args, I screwed up and force pushed the wrong version. I fixed that a 
force-pushed the squashed version once more. I'm gonna go ahead and rebased to 
the most-recent version of FLINK-32409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22848: [FLINK-32409][runtime] Makes DefaultLeaderElectionService not rely on MultipleComponentLeaderElectionDriverAdapter anymore

2023-07-05 Thread via GitHub


XComp commented on PR #22848:
URL: https://github.com/apache/flink/pull/22848#issuecomment-1621887334

   I added the hotfix for the shutdown logic and squashed the commits together. 
I will do a separate rebase to most-recent (stable) version of `master` 
(44e1397e2e7aa54667bf3b30bc6f6d5db32b5f79) afterwards to trigger a final CI run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22848: [FLINK-32409][runtime] Makes DefaultLeaderElectionService not rely on MultipleComponentLeaderElectionDriverAdapter anymore

2023-07-05 Thread via GitHub


XComp commented on code in PR #22848:
URL: https://github.com/apache/flink/pull/22848#discussion_r1253200918


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java:
##
@@ -69,75 +59,72 @@ public class 
KubernetesMultipleComponentLeaderElectionHaServices extends Abstrac
 
 private final String lockIdentity;
 
-private final FatalErrorHandler fatalErrorHandler;
-
-@Nullable
-@GuardedBy("lock")
-private DefaultMultipleComponentLeaderElectionService 
multipleComponentLeaderElectionService =
-null;
-
 KubernetesMultipleComponentLeaderElectionHaServices(
 FlinkKubeClient kubeClient,
-Executor executor,
-Configuration config,
-BlobStoreService blobStoreService,
-FatalErrorHandler fatalErrorHandler)
-throws IOException {
+Executor ioExecutor,
+Configuration configuration,
+BlobStoreService blobStoreService)
+throws Exception {
+this(
+kubeClient,
+kubeClient.createConfigMapSharedWatcher(
+KubernetesUtils.getConfigMapLabels(
+
configuration.get(KubernetesConfigOptions.CLUSTER_ID),
+LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)),
+Executors.newCachedThreadPool(
+new ExecutorThreadFactory("config-map-watch-handler")),
+ioExecutor,
+configuration.get(KubernetesConfigOptions.CLUSTER_ID),
+UUID.randomUUID().toString(),
+configuration,
+blobStoreService);
+}
 
+private KubernetesMultipleComponentLeaderElectionHaServices(
+FlinkKubeClient kubeClient,
+KubernetesConfigMapSharedWatcher configMapSharedWatcher,
+ExecutorService watchExecutorService,
+Executor ioExecutor,
+String clusterId,
+String lockIdentity,
+Configuration configuration,
+BlobStoreService blobStoreService)
+throws Exception {
 super(
-config,
-executor,
+configuration,
+createDriverFactory(
+kubeClient,
+configMapSharedWatcher,
+watchExecutorService,
+clusterId,
+lockIdentity,
+configuration),
+ioExecutor,
 blobStoreService,
-FileSystemJobResultStore.fromConfiguration(config));
-this.kubeClient = checkNotNull(kubeClient);
-this.clusterId = 
checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));
-this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+FileSystemJobResultStore.fromConfiguration(configuration));
 
-this.configMapSharedWatcher =
-this.kubeClient.createConfigMapSharedWatcher(
-KubernetesUtils.getConfigMapLabels(
-clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
-this.watchExecutorService =
-Executors.newCachedThreadPool(
-new ExecutorThreadFactory("config-map-watch-handler"));
-
-lockIdentity = UUID.randomUUID().toString();
+this.kubeClient = checkNotNull(kubeClient);
+this.clusterId = checkNotNull(clusterId);
+this.configMapSharedWatcher = checkNotNull(configMapSharedWatcher);
+this.watchExecutorService = checkNotNull(watchExecutorService);

Review Comment:
   true, I'm gonna add a hotfix commit adding the shutdown logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #22919: [FLINK-31837][runtime] Makes LeaderElectionDriver instantiation happen lazily

2023-07-05 Thread via GitHub


XComp commented on PR #22919:
URL: https://github.com/apache/flink/pull/22919#issuecomment-1621865441

   @zentol I addressed your comments. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22919: [FLINK-31837][runtime] Makes LeaderElectionDriver instantiation happen lazily

2023-07-05 Thread via GitHub


XComp commented on code in PR #22919:
URL: https://github.com/apache/flink/pull/22919#discussion_r1253187439


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##
@@ -180,25 +186,117 @@ void testCloseGrantDeadlock() throws Exception {
 }
 
 @Test
-void testGrantCallWhileInstantiatingDriver() throws Exception {
-final UUID expectedLeaderSessionID = UUID.randomUUID();
+void testLazyDriverInstantiation() throws Exception {
+final AtomicBoolean driverCreated = new AtomicBoolean();
 try (final DefaultLeaderElectionService testInstance =
 new DefaultLeaderElectionService(
 (listener, errorHandler) -> {
-listener.isLeader(expectedLeaderSessionID);
+driverCreated.set(true);
 return TestingLeaderElectionDriver.newNoOpBuilder()
 .build(listener, errorHandler);
 },
 
fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
 Executors.newDirectExecutorService())) {
-testInstance.startLeaderElectionBackend();
+assertThat(driverCreated)
+.as("The driver shouldn't have been created during service 
creation.")
+.isFalse();
+
+try (final LeaderElection leaderElection =
+testInstance.createLeaderElection("contender-id")) {
+assertThat(driverCreated)
+.as(
+"The driver shouldn't have been created during 
LeaderElection creation.")
+.isFalse();
+
+leaderElection.startLeaderElection(
+TestingGenericLeaderContender.newBuilder().build());
+assertThat(driverCreated)
+.as(
+"The driver should have been created when 
registering the contender in the LeaderElection.")
+.isTrue();
+}
+}
+}
+
+@Test
+void testReuseOfServiceIsRestricted() throws Exception {
+final DefaultLeaderElectionService testInstance =
+new DefaultLeaderElectionService(
+new TestingLeaderElectionDriver.Factory(
+TestingLeaderElectionDriver.newNoOpBuilder()));
+
+// The driver hasn't started, yet, which prevents the service from 
going into running state.
+// This results in the close method not having any effect.
+testInstance.close();
+
+try (final LeaderElection leaderElection =
+testInstance.createLeaderElection("contender-id")) {

Review Comment:
   The new commit implementing the behavior described in [the comment 
above](https://github.com/apache/flink/pull/22919#discussion_r1253186428) will 
make the call fail from now on. :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22919: [FLINK-31837][runtime] Makes LeaderElectionDriver instantiation happen lazily

2023-07-05 Thread via GitHub


XComp commented on code in PR #22919:
URL: https://github.com/apache/flink/pull/22919#discussion_r1253186428


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##
@@ -125,7 +126,12 @@ void testCloseGrantDeadlock() throws Exception {
 final DefaultLeaderElectionService testInstance =
 new DefaultLeaderElectionService(
 driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler());
-testInstance.startLeaderElectionBackend();
+
+// creating the LeaderElection is necessary to instantiate the driver
+final LeaderElection leaderElection = 
testInstance.createLeaderElection("contender-id");
+
leaderElection.startLeaderElection(TestingGenericLeaderContender.newBuilder().build());
+leaderElection.close();

Review Comment:
   > This seems sketchy. Given that the driver is lazily initialized once the 
first contender is registered, wouldn't it be intuitive for the driver to be 
closed when the last leader election is deregistered?
   
   I thought about that one and it sounds like the more consistent behavior. It 
actually works quite well:
   * The driver's lifecycle will be bound to the `leaderContenderRegistry` (a 
driver is created if a contender is added to the previously empty registry; the 
driver is closed if the registry becomes empty again)
   * the `leaderOperatiionExecutor` is still bound to the service's lifecycle. 
The executor gets added/instantiated within the constructor and shut down in 
the service's close method. No new contender can be registered after 
`DefaultLeaderElectionService.close` is called 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on pull request #620: [FLINK-32317] Enrich metadata in CR error field

2023-07-05 Thread via GitHub


darenwkt commented on PR #620:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/620#issuecomment-1621854611

   Hi Gyula, thank you for the review and apology for addressing them so late...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32484) AdaptiveScheduler combined restart during scaling out

2023-07-05 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-32484.
--
Resolution: Duplicate

> AdaptiveScheduler combined restart during scaling out
> -
>
> Key: FLINK-32484
> URL: https://issues.apache.org/jira/browse/FLINK-32484
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.17.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> On a scaling-out operation, when nodes are added at different times, 
> AdaptiveScheduler does multiple restarts within a short period of time. On 
> one of our Flink jobs, we have seen AdaptiveScheduler restart the 
> ExecutionGraph every time there is a notification of new resources to it. 
> There are five restarts within 3 minutes.
> AdaptiveScheduler could provide a configurable restart window interval to the 
> user during which it combines the notified resources and restarts once when 
> the available resources are sufficient to fit the desired parallelism or when 
> the window times out. The window is created during the first notification of 
> resources received. This is applicable only when the execution graph is in 
> the executing state and not in the waiting for resources state.
>  
> {code:java}
> [root@ip-1-2-3-4 container_1688034805200_0002_01_01]# grep -i scale *
> jobmanager.log:2023-06-29 10:46:58,061 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:47:57,317 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:48:53,314 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:49:27,821 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:50:15,672 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> [root@ip-1-2-3-4 container_1688034805200_0002_01_01]# {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32484) AdaptiveScheduler combined restart during scaling out

2023-07-05 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740232#comment-17740232
 ] 

Gyula Fora commented on FLINK-32484:


I think this is related to 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler]
 and the related ML thread. 
[https://lists.apache.org/thread/qvgxzhbp9rhlsqrybxdy51h05zwxfns6]

If you agree I suggest closing this ticket and joining in that discussion.

> AdaptiveScheduler combined restart during scaling out
> -
>
> Key: FLINK-32484
> URL: https://issues.apache.org/jira/browse/FLINK-32484
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.17.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> On a scaling-out operation, when nodes are added at different times, 
> AdaptiveScheduler does multiple restarts within a short period of time. On 
> one of our Flink jobs, we have seen AdaptiveScheduler restart the 
> ExecutionGraph every time there is a notification of new resources to it. 
> There are five restarts within 3 minutes.
> AdaptiveScheduler could provide a configurable restart window interval to the 
> user during which it combines the notified resources and restarts once when 
> the available resources are sufficient to fit the desired parallelism or when 
> the window times out. The window is created during the first notification of 
> resources received. This is applicable only when the execution graph is in 
> the executing state and not in the waiting for resources state.
>  
> {code:java}
> [root@ip-1-2-3-4 container_1688034805200_0002_01_01]# grep -i scale *
> jobmanager.log:2023-06-29 10:46:58,061 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:47:57,317 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:48:53,314 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:49:27,821 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> jobmanager.log:2023-06-29 10:50:15,672 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - New 
> resources are available. Restarting job to scale up.
> [root@ip-1-2-3-4 container_1688034805200_0002_01_01]# {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] darenwkt closed pull request #566: [FLINK-31798] Add config to disable EventRecorder

2023-07-05 Thread via GitHub


darenwkt closed pull request #566: [FLINK-31798] Add config to disable 
EventRecorder
URL: https://github.com/apache/flink-kubernetes-operator/pull/566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt closed pull request #568: [FLINK-31845] Make KubernetesStepDecorator Pluggable

2023-07-05 Thread via GitHub


darenwkt closed pull request #568: [FLINK-31845] Make KubernetesStepDecorator 
Pluggable
URL: https://github.com/apache/flink-kubernetes-operator/pull/568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt closed pull request #570: [FLINK-31857] Observer plugin

2023-07-05 Thread via GitHub


darenwkt closed pull request #570: [FLINK-31857] Observer plugin
URL: https://github.com/apache/flink-kubernetes-operator/pull/570


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-07-05 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan closed FLINK-32362.
---
Fix Version/s: 1.18.0
   1.16.3
   1.17.2
   Resolution: Fixed

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32362) SourceAlignment announceCombinedWatermark period task maybe lost

2023-07-05 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740225#comment-17740225
 ] 

Rui Fan commented on FLINK-32362:
-

Merged via
master 1.18:   5509c61
1.17:  7219ca1
1.16:  1847b84

> SourceAlignment announceCombinedWatermark period task maybe lost
> 
>
> Key: FLINK-32362
> URL: https://issues.apache.org/jira/browse/FLINK-32362
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Assignee: Cai Liuyang
>Priority: Major
>  Labels: pull-request-available
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui merged pull request #22953: FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-07-05 Thread via GitHub


1996fanrui merged PR #22953:
URL: https://github.com/apache/flink/pull/22953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #22953: FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-07-05 Thread via GitHub


1996fanrui commented on PR #22953:
URL: https://github.com/apache/flink/pull/22953#issuecomment-1621794711

   Thanks @LoveHeat for the fix, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui merged pull request #22954: FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-07-05 Thread via GitHub


1996fanrui merged PR #22954:
URL: https://github.com/apache/flink/pull/22954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #22954: FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-07-05 Thread via GitHub


1996fanrui commented on PR #22954:
URL: https://github.com/apache/flink/pull/22954#issuecomment-1621791793

   Thanks @LoveHeat for the fix, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui merged pull request #22806: [FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-07-05 Thread via GitHub


1996fanrui merged PR #22806:
URL: https://github.com/apache/flink/pull/22806


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #22806: [FLINK-32362] [connectors/common] increase the robustness of announceCombinedWatermark to cover the case task failover

2023-07-05 Thread via GitHub


1996fanrui commented on PR #22806:
URL: https://github.com/apache/flink/pull/22806#issuecomment-1621789482

   Thanks to @LoveHeat for the fix, and thanks @pnowojski for the review.
   
   CI passed, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32545) Improves the performance by optimizing row operations

2023-07-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32545:
---
Labels: pull-request-available  (was: )

> Improves the performance by optimizing row operations
> -
>
> Key: FLINK-32545
> URL: https://issues.apache.org/jira/browse/FLINK-32545
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.4.0
>
>
> Currently, dozens of algorithms in Flink ML contain code like `Row.join(row, 
> Row.of(...))` which is expensive. We should avoid creating Rows multiple 
> times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] jiangxin369 opened a new pull request, #246: [FLINK-32545] Improves the performance by optimizing row operations

2023-07-05 Thread via GitHub


jiangxin369 opened a new pull request, #246:
URL: https://github.com/apache/flink-ml/pull/246

   
   
   ## What is the purpose of the change
   
Improves the performance by optimizing row operations.
   
   ## Brief change log
   
 - Adds RowUtils to help with Row operations.
 - Replaces all Row.join() function call with new utility.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22958: [FLINK-32541][network] Fix the buffer leaking when buffer accumulator is released

2023-07-05 Thread via GitHub


flinkbot commented on PR #22958:
URL: https://github.com/apache/flink/pull/22958#issuecomment-1621740989

   
   ## CI report:
   
   * 6bac655adf8d4704a242aeba82896a83b9905268 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32541) Fix the buffer leaking in buffer accumulators when a failover occurs

2023-07-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32541:
---
Labels: pull-request-available  (was: )

> Fix the buffer leaking in buffer accumulators when a failover occurs
> 
>
> Key: FLINK-32541
> URL: https://issues.apache.org/jira/browse/FLINK-32541
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> When a failover occurs, the buffers in the sort/hash accumulators should be 
> released correctly to avoid buffers leaking. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx opened a new pull request, #22958: [FLINK-32541][network] Fix the buffer leaking when buffer accumulator is released

2023-07-05 Thread via GitHub


TanYuxin-tyx opened a new pull request, #22958:
URL: https://github.com/apache/flink/pull/22958

   
   
   
   
   ## What is the purpose of the change
   
   *Fix the buffer leaking when buffer accumulator is released*
   
   
   ## Brief change log
   
 - *Fix the buffer leaking when buffer accumulators is released*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added tests for in the `SortBufferAccumulatorTest` and 
`HashBufferAccumulatorTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32545) Improves the performance by optimizing row operations

2023-07-05 Thread Jiang Xin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xin updated FLINK-32545:
--
Summary: Improves the performance by optimizing row operations  (was: 
Removes the expensive Row operations like join)

> Improves the performance by optimizing row operations
> -
>
> Key: FLINK-32545
> URL: https://issues.apache.org/jira/browse/FLINK-32545
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
> Fix For: ml-2.4.0
>
>
> Currently, dozens of algorithms in Flink ML contain code like `Row.join(row, 
> Row.of(...))` which is expensive. We should avoid creating Rows multiple 
> times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #22949: [FLINK-32517][table] Support to execute [CREATE OR] REPLACE TABLE AS statement

2023-07-05 Thread via GitHub


luoyuxia commented on code in PR #22949:
URL: https://github.com/apache/flink/pull/22949#discussion_r1253044112


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java:
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ReplaceTableAsOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.parse.CalciteParser;
+import org.apache.flink.table.types.AbstractDataType;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for testing convert [CREATE OR] REPLACE TABLE AS statement to 
operation. */
+public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+
+@Test
+public void testReplaceTableAS() {
+String tableName = "replace_table";
+String sql =

Review Comment:
   Please add comment with chinese.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -784,11 +786,33 @@ public CompiledPlan compilePlan(List 
operations) {
 public TableResultInternal executeInternal(List 
operations) {
 List mapOperations = new ArrayList<>();
 for (ModifyOperation modify : operations) {
-// execute CREATE TABLE first for CTAS statements
 if (modify instanceof CreateTableASOperation) {
+// execute CREATE TABLE first for CTAS statements
 CreateTableASOperation ctasOperation = 
(CreateTableASOperation) modify;
 executeInternal(ctasOperation.getCreateTableOperation());
 
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+} else if (modify instanceof ReplaceTableAsOperation) {
+ReplaceTableAsOperation rtasOperation = 
(ReplaceTableAsOperation) modify;

Review Comment:
   Please extract the code lines to a method



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##
@@ -85,6 +87,12 @@ public static List buildModifyColumnChange(
 .orElse(null);
 }
 
+public static @Nullable String 
getTableComment(Optional tableComment) {
+return tableComment

Review Comment:
   Please follow the implementation of `getComment(SqlTableColumn column)`



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##
@@ -171,11 +171,7 @@ private CatalogTable createCatalogTable(SqlCreateTable 
sqlCreateTable) {
 mergingStrategies);
 verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
 
-String tableComment =
-sqlCreateTable
-.getComment()
-.map(comment -> 
comment.getValueAs(NlsString.class).getValue())
-.orElse(null);
+String tableComment = 
OperationConverterUtils.getTableComment(sqlCreateTable.getComment());

Review Comment:
   I'm wondering it works when the comment is Chinese? Can you please verify 
with Chinese comment?
   I mean, will it cause the problem like 
   https://issues.apache.org/jira/browse/FLINK-32249?
   



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more 

[jira] [Created] (FLINK-32545) Removes the expensive Row operations like join

2023-07-05 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-32545:
-

 Summary: Removes the expensive Row operations like join
 Key: FLINK-32545
 URL: https://issues.apache.org/jira/browse/FLINK-32545
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Jiang Xin
 Fix For: ml-2.4.0


Currently, dozens of algorithms in Flink ML contain code like `Row.join(row, 
Row.of(...))` which is expensive. We should avoid creating Rows multiple times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on pull request #22957: [FLINK-32544][python] Fix test_java_sql_ddl to works in JDK17

2023-07-05 Thread via GitHub


zentol commented on PR #22957:
URL: https://github.com/apache/flink/pull/22957#issuecomment-1621672385

   CI run :crossed_fingers: : 
https://dev.azure.com/chesnay/flink/_build/results?buildId=3679=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #22851: [FLINK-31646][network] Implement the remote tier producer for the tiered storage

2023-07-05 Thread via GitHub


TanYuxin-tyx commented on PR #22851:
URL: https://github.com/apache/flink/pull/22851#issuecomment-1621671524

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22957: [FLINK-32544][python] Fix test_java_sql_ddl to works in JDK17

2023-07-05 Thread via GitHub


flinkbot commented on PR #22957:
URL: https://github.com/apache/flink/pull/22957#issuecomment-1621657961

   
   ## CI report:
   
   * df82b2f3100904517fff667f0c40e48dc20adac2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dianfu commented on pull request #22957: [FLINK-32544][python] Fix test_java_sql_ddl to works in JDK17

2023-07-05 Thread via GitHub


dianfu commented on PR #22957:
URL: https://github.com/apache/flink/pull/22957#issuecomment-1621643609

   cc @zentol 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32544) PythonFunctionFactoryTest fails on Java 17

2023-07-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32544:
---
Labels: pull-request-available  (was: )

> PythonFunctionFactoryTest fails on Java 17
> --
>
> Key: FLINK-32544
> URL: https://issues.apache.org/jira/browse/FLINK-32544
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Legacy Components / Flink on Tez
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> https://dev.azure.com/chesnay/flink/_build/results?buildId=3676=logs=fba17979-6d2e-591d-72f1-97cf42797c11=727942b6-6137-54f7-1ef9-e66e706ea068
> {code}
> Jul 05 10:17:23 Exception in thread "main" 
> java.lang.reflect.InaccessibleObjectException: Unable to make field private 
> static java.util.IdentityHashMap java.lang.ApplicationShutdownHooks.hooks 
> accessible: module java.base does not "opens java.lang" to unnamed module 
> @1880a322
> Jul 05 10:17:23   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
> Jul 05 10:17:23   at 
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
> Jul 05 10:17:23   at 
> java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
> Jul 05 10:17:23   at 
> java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
> Jul 05 10:17:23   at 
> org.apache.flink.client.python.PythonFunctionFactoryTest.closeStartedPythonProcess(PythonFunctionFactoryTest.java:115)
> Jul 05 10:17:23   at 
> org.apache.flink.client.python.PythonFunctionFactoryTest.cleanEnvironment(PythonFunctionFactoryTest.java:79)
> Jul 05 10:17:23   at 
> org.apache.flink.client.python.PythonFunctionFactoryTest.main(PythonFunctionFactoryTest.java:52)
> {code}
> Side-notes:
> * maybe re-evaluate if the test could be run through maven now
> * The shutdown hooks business is quite sketchy, and AFAICT would be 
> unnecessary if the test were an ITCase



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dianfu opened a new pull request, #22957: [FLINK-32544][python] Fix test_java_sql_ddl to works in JDK17

2023-07-05 Thread via GitHub


dianfu opened a new pull request, #22957:
URL: https://github.com/apache/flink/pull/22957

   
   ## What is the purpose of the change
   
   *This pull request fixes test_java_sql_ddl to works in JDK17*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32388) Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-07-05 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-32388.
--
Resolution: Fixed

master:

ec222eae3f6f501d8681a40a7c528003e1006736

Thanks for the pr.

> Add the ability to pass parameters to CUSTOM PartitionCommitPolicy
> --
>
> Key: FLINK-32388
> URL: https://issues.apache.org/jira/browse/FLINK-32388
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Gary Cao
>Priority: Major
>  Labels: pull-request-available
>
> By allowing the passing of parameters, the custom PartitionCommitPolicy 
> becomes more flexible and customizable. This enables user to enhance their 
> custom PartitionCommitPolicy by including additional functionality, such as 
> passing monitoring parameters to track the files associated with each commit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia merged pull request #22831: [FLINK-32388]Add the ability to pass parameters to CUSTOM PartitionCommitPolicy

2023-07-05 Thread via GitHub


luoyuxia merged PR #22831:
URL: https://github.com/apache/flink/pull/22831


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25322) Support no-claim mode in changelog state backend

2023-07-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740173#comment-17740173
 ] 

Piotr Nowojski commented on FLINK-25322:


Fair enough. In that case I would suggest to write a small FLIP proposal to 
change the semantic when the starting snapshot in no-claim mode is no longer 
needed, including motivation, and proposed changes to the user facing things 
(REST API? WebUI?). It most likely doesn't have to be long. Changelog 
statebackend can be included there as a motivation, but I don't think the 
internals of how this could be done in changelog have to be discussed there in 
a detail.

> Support no-claim mode in changelog state backend
> 
>
> Key: FLINK-25322
> URL: https://issues.apache.org/jira/browse/FLINK-25322
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Dawid Wysakowicz
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >