[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21140: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

2022-10-25 Thread GitBox


gaborgsomogyi commented on code in PR #21140:
URL: https://github.com/apache/flink/pull/21140#discussion_r1005252795


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Delegation token provider implementation for HBase. Basically it would be 
good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the 
connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hbaseConf;
+
+@Override
+public String serviceName() {
+return "hbase";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hbaseConf = getHBaseConfiguration(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHBaseConfiguration(Configuration conf) {
+org.apache.hadoop.conf.Configuration hbaseConf = null;
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+// 
+// Intended call: HBaseConfiguration.create(conf);
+// HBaseConfiguration.create has been added to HBase in v0.90.0 so 
we can eliminate
+// reflection magic when we drop ancient HBase support.
+hbaseConf =
+(org.apache.hadoop.conf.Configuration)
+
Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+.getMethod("create", 
org.apache.hadoop.conf.Configuration.class)
+.invoke(null, hadoopConf);
+// 
+} catch (InvocationTargetException
+| NoSuchMethodException
+| IllegalAccessException
+| ClassNotFoundException e) {
+LOG.info(
+"HBase is not available (not packaged with this 
application): {} : \"{}\".",
+e.getClass().getSimpleName(),
+e.getMessage());
+}
+return hbaseConf;
+}
+
+@Override
+public boolean delegationTokensRequired() {
+try {
+if 
(!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) 
{
+return false;
+}
+} catch (IOException e) {
+LOG.debug("Hadoop Kerberos is not enabled.");
+return false;
+}
+return Objects.nonNull(hbaseConf)
+&& 
hbaseConf.get("hbase.security.authentication").equals("kerberos");
+}
+
+@Override
+public Optional obtainDelegationTokens(Credentials credentials) 
throws Exception {
+Token token;
+try {
+Preconditions.checkNotNull(hbaseConf);
+try {
+LOG.info("Obtaining Kerberos security token for HBase");
+// 
+// Intended call: Token token =
+// TokenUtil.obtainToken(conf);
+token =
+(Token)
+
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+.getMethod(
+"obtainToken",
+

[GitHub] [flink] snuyanzin commented on pull request #21152: [hotfix][connectors][kafka] Remove unused private classes and methods

2022-10-25 Thread GitBox


snuyanzin commented on PR #21152:
URL: https://github.com/apache/flink/pull/21152#issuecomment-1291533822

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #21153: [hotfix][runtime] Remove private methods

2022-10-25 Thread GitBox


snuyanzin commented on PR #21153:
URL: https://github.com/apache/flink/pull/21153#issuecomment-1291533521

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #21154: [hotfix] Add icon for Flink in IntellijIdea and Toolbox

2022-10-25 Thread GitBox


snuyanzin commented on PR #21154:
URL: https://github.com/apache/flink/pull/21154#issuecomment-1291533163

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on PR #2:
URL: https://github.com/apache/flink/pull/2#issuecomment-1291531626

   Thanks for review @zhuzhurk. I 've addressed or replied all comments. Please 
take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005243287


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Information of All-To-All result. */
+public class AllToAllBlockingResultInfo implements BlockingResultInfo {
+
+private final IntermediateDataSetID resultId;
+
+private final int numOfSubpartitions;
+
+private final boolean isBroadcast;
+
+/**
+ * Aggregated subpartition bytes, which aggregates the subpartition bytes 
with the same
+ * subpartition index in different partitions. Note that We can aggregate 
them because they will
+ * be consumed by the same downstream task.
+ */
+private final long[] aggregatedSubpartitionBytes;
+
+AllToAllBlockingResultInfo(
+IntermediateDataSetID resultId, int numOfSubpartitions, boolean 
isBroadcast) {
+this.resultId = checkNotNull(resultId);
+this.numOfSubpartitions = numOfSubpartitions;
+this.isBroadcast = isBroadcast;
+this.aggregatedSubpartitionBytes = new long[numOfSubpartitions];
+Arrays.fill(this.aggregatedSubpartitionBytes, 0L);
+}
+
+@Override
+public IntermediateDataSetID getResultId() {
+return resultId;
+}
+
+@Override
+public boolean isBroadcast() {
+return isBroadcast;
+}
+
+@Override
+public boolean isPointwise() {
+return false;
+}
+
+@Override
+public long getNumBytesProduced() {
+if (isBroadcast) {
+return aggregatedSubpartitionBytes[0];
+} else {
+return Arrays.stream(aggregatedSubpartitionBytes).sum();
+}
+}
+
+@Override
+public void partitionFinished(int partitionIndex, ResultPartitionBytes 
partitionBytes) {

Review Comment:
   Replied above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005242107


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
 super.startSchedulingInternal();
 }
 
+public boolean updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+updateResultPartitionBytesMetrics(taskExecutionState.getIOMetrics());

Review Comment:
   1, 2:Fixed.
   3:This is indeed a problem. We can't clear the metric of a partition when it 
is reset, because for `ALL_TO_ALL` results, we only store the aggregated value. 
Given that, I think we should only record the metric on the first finish, and 
let it as the final value. @zhuzhurk  WDYT ?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java:
##
@@ -18,63 +18,53 @@
 
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** The blocking result info, which will be used to calculate the vertex 
parallelism. */
-public class BlockingResultInfo {
-
-private final List blockingPartitionSizes;
-
-private final boolean isBroadcast;
-
-private BlockingResultInfo(List blockingPartitionSizes, boolean 
isBroadcast) {
-this.blockingPartitionSizes = blockingPartitionSizes;
-this.isBroadcast = isBroadcast;
-}
-
-public List getBlockingPartitionSizes() {
-return blockingPartitionSizes;
-}
-
-public boolean isBroadcast() {
-return isBroadcast;
-}
-
-@VisibleForTesting
-static BlockingResultInfo createFromBroadcastResult(List 
blockingPartitionSizes) {
-return new BlockingResultInfo(blockingPartitionSizes, true);
-}
-
-@VisibleForTesting
-static BlockingResultInfo createFromNonBroadcastResult(List 
blockingPartitionSizes) {
-return new BlockingResultInfo(blockingPartitionSizes, false);
-}
-
-public static BlockingResultInfo createFromIntermediateResult(
-IntermediateResult intermediateResult) {
-checkArgument(intermediateResult != null);
-
-List blockingPartitionSizes = new ArrayList<>();
-for (IntermediateResultPartition partition : 
intermediateResult.getPartitions()) {
-checkState(partition.isConsumable());
-
-IOMetrics ioMetrics = 
partition.getProducer().getPartitionProducer().getIOMetrics();
-checkNotNull(ioMetrics, "IOMetrics should not be null.");
-
-blockingPartitionSizes.add(
-
ioMetrics.getNumBytesProducedOfPartitions().get(partition.getPartitionId()));
-}
-
-return new BlockingResultInfo(blockingPartitionSizes, 
intermediateResult.isBroadcast());
-}
+/**
+ * The blocking result info, which will be used to calculate the vertex 
parallelism and input infos.
+ */
+public interface BlockingResultInfo {
+
+/**
+ * Get the intermediate result id.
+ *
+ * @return the intermediate result id
+ */
+IntermediateDataSetID getResultId();
+
+/**
+ * Whether it is a broadcast result.
+ *
+ * @return whether it is a broadcast result
+ */
+boolean isBroadcast();
+
+/**
+ * Whether it is a Pointwise result.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005242107


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
 super.startSchedulingInternal();
 }
 
+public boolean updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+updateResultPartitionBytesMetrics(taskExecutionState.getIOMetrics());

Review Comment:
   1, 2:Fixed.
   3:This is indeed a problem. We can't clear the metric of a partition when it 
is reset, because for `ALL_TO_ALL` results, we stores aggregated value. Given 
that, I think we should only record the metric on the first finish, and let it 
as the final value. @zhuzhurk  WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29764) Automatic judgment of parallelism of source

2022-10-25 Thread waywtdcc (Jira)
waywtdcc created FLINK-29764:


 Summary: Automatic judgment of parallelism of source
 Key: FLINK-29764
 URL: https://issues.apache.org/jira/browse/FLINK-29764
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.15.2, 1.16.0
Reporter: waywtdcc
 Fix For: 1.17.0


The parallelism of the source is automatically judged. The parallelism of the 
source should not be determined by jobmanager. adaptive batch scheduler The 
default source parallelism is judged by the two configurations of 
jobmanager.adaptive batch-scheduler.min-parallelism and jobmanager.adaptive 
batch-scheduler.max-parallelism and the number of partitions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005237404


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java:
##
@@ -42,30 +47,52 @@ public class IOMetrics implements Serializable {
 protected double accumulateBusyTime;
 protected long accumulateIdleTime;
 
-protected final Map 
numBytesProducedOfPartitions =
-new HashMap<>();
+@Nullable
+protected Map 
resultPartitionBytes;
 
 public IOMetrics(
 Meter recordsIn,
 Meter recordsOut,
 Meter bytesIn,
 Meter bytesOut,
-Map 
numBytesProducedCounters,
 Gauge accumulatedBackPressuredTime,
 Gauge accumulatedIdleTime,
-Gauge accumulatedBusyTime) {
+Gauge accumulatedBusyTime,
+Map
+resultPartitionBytesCounters) {
 this.numRecordsIn = recordsIn.getCount();
 this.numRecordsOut = recordsOut.getCount();
 this.numBytesIn = bytesIn.getCount();
 this.numBytesOut = bytesOut.getCount();
 this.accumulateBackPressuredTime = 
accumulatedBackPressuredTime.getValue();
 this.accumulateBusyTime = accumulatedBusyTime.getValue();
 this.accumulateIdleTime = accumulatedIdleTime.getValue();
+this.resultPartitionBytes =
+resultPartitionBytesCounters.entrySet().stream()
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry -> 
entry.getValue().createSnapshot()));
+}
 
-for (Map.Entry counter :
-numBytesProducedCounters.entrySet()) {
-numBytesProducedOfPartitions.put(counter.getKey(), 
counter.getValue().getCount());
-}
+public IOMetrics(
+long numBytesIn,
+long numBytesOut,
+long numRecordsIn,
+long numRecordsOut,
+long accumulateIdleTime,
+double accumulateBusyTime,
+long accumulateBackPressuredTime,
+@Nullable
+Map 
resultPartitionBytes) {
+this.numBytesIn = numBytesIn;
+this.numBytesOut = numBytesOut;
+this.numRecordsIn = numRecordsIn;
+this.numRecordsOut = numRecordsOut;
+this.accumulateIdleTime = accumulateIdleTime;
+this.accumulateBusyTime = accumulateBusyTime;
+this.accumulateBackPressuredTime = accumulateBackPressuredTime;
+this.resultPartitionBytes = resultPartitionBytes;
 }
 
 public IOMetrics(

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005237222


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##
@@ -1548,7 +1548,18 @@ private void updateAccumulatorsAndMetrics(
 }
 }
 if (metrics != null) {
-this.ioMetrics = metrics;
+// The IOMetrics#resultPartitionBytes will not be used anymore, so 
we clear it here to
+// reduce the space usage.
+this.ioMetrics =

Review Comment:
   Fixed



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java:
##
@@ -42,30 +47,52 @@ public class IOMetrics implements Serializable {
 protected double accumulateBusyTime;
 protected long accumulateIdleTime;
 
-protected final Map 
numBytesProducedOfPartitions =
-new HashMap<>();
+@Nullable
+protected Map 
resultPartitionBytes;
 
 public IOMetrics(
 Meter recordsIn,
 Meter recordsOut,
 Meter bytesIn,
 Meter bytesOut,
-Map 
numBytesProducedCounters,
 Gauge accumulatedBackPressuredTime,
 Gauge accumulatedIdleTime,
-Gauge accumulatedBusyTime) {
+Gauge accumulatedBusyTime,
+Map
+resultPartitionBytesCounters) {
 this.numRecordsIn = recordsIn.getCount();
 this.numRecordsOut = recordsOut.getCount();
 this.numBytesIn = bytesIn.getCount();
 this.numBytesOut = bytesOut.getCount();
 this.accumulateBackPressuredTime = 
accumulatedBackPressuredTime.getValue();
 this.accumulateBusyTime = accumulatedBusyTime.getValue();
 this.accumulateIdleTime = accumulatedIdleTime.getValue();
+this.resultPartitionBytes =
+resultPartitionBytesCounters.entrySet().stream()
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry -> 
entry.getValue().createSnapshot()));
+}
 
-for (Map.Entry counter :
-numBytesProducedCounters.entrySet()) {
-numBytesProducedOfPartitions.put(counter.getKey(), 
counter.getValue().getCount());
-}
+public IOMetrics(

Review Comment:
   Yes, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005236841


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java:
##
@@ -74,7 +101,7 @@ public IOMetrics(
 long numRecordsIn,
 long numRecordsOut,
 long accumulateIdleTime,
-long accumulateBusyTime,
+double accumulateBusyTime,

Review Comment:
   I will make this a separate hotfix commit at the later rebase



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##
@@ -149,13 +159,45 @@ protected void startSchedulingInternal() {
 super.startSchedulingInternal();
 }
 
+public boolean updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005235949


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionBytesCounter.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This counter will count the data size of a partition. */
+public class ResultPartitionBytesCounter {
+
+/** The data size of each subpartition. */
+private final List subpartitionBytes;
+
+public ResultPartitionBytesCounter(int numSubpartitions) {
+this.subpartitionBytes = new ArrayList<>();
+for (int i = 0; i < numSubpartitions; ++i) {
+subpartitionBytes.add(new SimpleCounter());
+}
+}
+
+public void inc(int targetSubpartition, long bytes) {
+subpartitionBytes.get(targetSubpartition).inc(bytes);
+}
+
+public void broadcastInc(long bytes) {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #21111: [FLINK-29664][runtime] Collect subpartition sizes of blocking result partitions

2022-10-25 Thread GitBox


wanglijie95 commented on code in PR #2:
URL: https://github.com/apache/flink/pull/2#discussion_r1005235773


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##
@@ -693,41 +664,28 @@ private ExecutionGraph createExecutionGraph(Configuration 
configuration) throws
 .setJobGraph(jobGraph)
 .setJobMasterConfig(configuration)
 .setBlobWriter(blobWriter)
-.build(EXECUTOR_RESOURCE.getExecutor());
+.build(EXECUTOR_EXTENSION.getExecutor());
 }
 
-private static final class ExecutionStageMatcher
-extends TypeSafeMatcher> {
-private final List> executionStages;
-
-private ExecutionStageMatcher(List> 
executionStages) {
-this.executionStages = executionStages;
-}
-
-@Override
-protected boolean matchesSafely(List 
submissionOrder) {
-final Iterator submissionIterator = 
submissionOrder.iterator();
+private boolean isDeployedInTopologicalOrder(

Review Comment:
   Fixed



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionBytesCounter.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This counter will count the data size of a partition. */
+public class ResultPartitionBytesCounter {
+
+/** The data size of each subpartition. */
+private final List subpartitionBytes;
+
+public ResultPartitionBytesCounter(int numSubpartitions) {
+this.subpartitionBytes = new ArrayList<>();
+for (int i = 0; i < numSubpartitions; ++i) {
+subpartitionBytes.add(new SimpleCounter());
+}
+}
+
+public void inc(int targetSubpartition, long bytes) {
+subpartitionBytes.get(targetSubpartition).inc(bytes);
+}
+
+public void broadcastInc(long bytes) {
+subpartitionBytes.forEach(counter -> counter.inc(bytes));
+}
+
+//public List getSubpartitionBytes() {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #21137: [FLINK-29234][runtime] JobMasterServiceLeadershipRunner handle leader event in a separate executor to avoid dead lock

2022-10-25 Thread GitBox


reswqa commented on PR #21137:
URL: https://github.com/apache/flink/pull/21137#issuecomment-1291488778

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21156: [FLINK-20578] Allow for creation of empty arrays in Flink SQL query.

2022-10-25 Thread GitBox


flinkbot commented on PR #21156:
URL: https://github.com/apache/flink/pull/21156#issuecomment-1291483665

   
   ## CI report:
   
   * 99e7abc966e54376fb180fa81394732552253726 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-20578) Cannot create empty array using ARRAY[]

2022-10-25 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624154#comment-17624154
 ] 

Eric Xiao edited comment on FLINK-20578 at 10/26/22 4:19 AM:
-

Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue? If not I would love to take over.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I think there are two paths:
1. If we given more context on what the array type should be we should try 
using that.
2. If we have no context we use a default data type.

Path #1 - 
I can forsee queries as such `SELECT COALESCE(empty_str_column,ARRAY[])` where 
we could infer the data should be of string type and try to return that. 

Path #2 - Default Data Type
I believe the query in the issue would qualify as a query with no context. I 
tested a similar query in Trino (Presto) and BigQuery and they use a data type 
Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!
!Screen Shot 2022-10-25 at 10.50.47 PM.png! 
!Screen Shot 2022-10-25 at 11.01.06 PM.png! 



was (Author: JIRAUSER295489):
Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue, I noticed it has been a year since your last comment.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I think there are two paths:
1. If we given more context on what the array type should be we should try 
using that.
2. If we have no context we use a default data type.

Path #1 - 
I can forsee queries as such `SELECT COALESCE(empty_str_column,ARRAY[])` where 
we could infer the data should be of string type and try to return that. 

Path #2 - Default Data Type
I believe the query in the issue would qualify as a query with no context. I 
tested a similar query in Trino (Presto) and BigQuery and they use a data type 
Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!
!Screen Shot 2022-10-25 at 10.50.47 PM.png! 
!Screen Shot 2022-10-25 at 11.01.06 PM.png! 


> Cannot create empty array using ARRAY[]
> ---
>
> Key: FLINK-20578
> URL: https://issues.apache.org/jira/browse/FLINK-20578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.17.0
>
> Attachments: Screen Shot 2022-10-25 at 10.50.42 PM.png, Screen Shot 
> 2022-10-25 at 10.50.47 PM.png, Screen Shot 2022-10-25 at 11.01.06 PM.png
>
>
> Calling the ARRAY function without an element (`ARRAY[]`) results in an error 
> message.
> Is that the expected behavior?
> How can users create empty arrays?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-20578) Cannot create empty array using ARRAY[]

2022-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-20578:
---
Labels: pull-request-available starter  (was: starter)

> Cannot create empty array using ARRAY[]
> ---
>
> Key: FLINK-20578
> URL: https://issues.apache.org/jira/browse/FLINK-20578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.17.0
>
> Attachments: Screen Shot 2022-10-25 at 10.50.42 PM.png, Screen Shot 
> 2022-10-25 at 10.50.47 PM.png, Screen Shot 2022-10-25 at 11.01.06 PM.png
>
>
> Calling the ARRAY function without an element (`ARRAY[]`) results in an error 
> message.
> Is that the expected behavior?
> How can users create empty arrays?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ericxiao251 opened a new pull request, #21156: [FLINK-20578] Allow for creation of empty arrays in Flink SQL query.

2022-10-25 Thread GitBox


ericxiao251 opened a new pull request, #21156:
URL: https://github.com/apache/flink/pull/21156

   Fixes: https://issues.apache.org/jira/browse/FLINK-20578


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Laffery commented on pull request #21144: [FLINK-29747] refactor: module-based app to standalone components

2022-10-25 Thread GitBox


Laffery commented on PR #21144:
URL: https://github.com/apache/flink/pull/21144#issuecomment-1291471213

   LGTM :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29763) TaskManager heatbeat timeout exception in Github CI for python tests

2022-10-25 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-29763:


 Summary: TaskManager heatbeat timeout exception in Github CI for 
python tests
 Key: FLINK-29763
 URL: https://issues.apache.org/jira/browse/FLINK-29763
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


https://github.com/apache/flink-ml/actions/runs/3322007330/jobs/5490434747
https://github.com/apache/flink-ml/actions/runs/3321223124/jobs/5488576891
https://github.com/apache/flink-ml/actions/runs/3319920091/jobs/5485672250
https://github.com/apache/flink-ml/actions/runs/3319722473/jobs/5485231041
https://github.com/apache/flink-ml/actions/runs/3319599111/jobs/5484952148
https://github.com/apache/flink-ml/actions/runs/3318938657/jobs/5483471010



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wangyang0918 commented on pull request #20779: [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema.

2022-10-25 Thread GitBox


wangyang0918 commented on PR #20779:
URL: https://github.com/apache/flink/pull/20779#issuecomment-1291463883

   @SwimSweet Sorry for the late response. I believe this PR could work. 
However, my biggest concert is that it could only work for native K8s 
application. AFAIK, the Yarn application mode and standalone mode should also 
benefit from this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-20578) Cannot create empty array using ARRAY[]

2022-10-25 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624154#comment-17624154
 ] 

Eric Xiao edited comment on FLINK-20578 at 10/26/22 3:53 AM:
-

Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue, I noticed it has been a year since your last comment.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I think there are two paths:
1. If we given more context on what the array type should be we should try 
using that.
2. If we have no context we use a default data type.

Path #1 - 
I can forsee queries as such `SELECT COALESCE(empty_str_column,ARRAY[])` where 
we could infer the data should be of string type and try to return that. 

Path #2 - Default Data Type
I believe the query in the issue would qualify as a query with no context. I 
tested a similar query in Trino (Presto) and BigQuery and they use a data type 
Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!
!Screen Shot 2022-10-25 at 10.50.47 PM.png! 
!Screen Shot 2022-10-25 at 11.01.06 PM.png! 



was (Author: JIRAUSER295489):
Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue, I noticed it has been a year since your last comment.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I tested a similar query in Trino (Presto) and BigQuery and they by 
default use Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!
!Screen Shot 2022-10-25 at 10.50.47 PM.png! 
!Screen Shot 2022-10-25 at 11.01.06 PM.png! 


> Cannot create empty array using ARRAY[]
> ---
>
> Key: FLINK-20578
> URL: https://issues.apache.org/jira/browse/FLINK-20578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.17.0
>
> Attachments: Screen Shot 2022-10-25 at 10.50.42 PM.png, Screen Shot 
> 2022-10-25 at 10.50.47 PM.png, Screen Shot 2022-10-25 at 11.01.06 PM.png
>
>
> Calling the ARRAY function without an element (`ARRAY[]`) results in an error 
> message.
> Is that the expected behavior?
> How can users create empty arrays?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29760) Introduce snapshots metadata table

2022-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29760:
---
Labels: pull-request-available  (was: )

> Introduce snapshots metadata table
> --
>
> Key: FLINK-29760
> URL: https://issues.apache.org/jira/browse/FLINK-29760
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> Introduce snapshots metadata table to show snapshot history.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi opened a new pull request, #333: [FLINK-29760] Introduce snapshots metadata table

2022-10-25 Thread GitBox


JingsongLi opened a new pull request, #333:
URL: https://github.com/apache/flink-table-store/pull/333

   Introduce snapshots metadata table to show snapshot history.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #21145: [FLINK-29749][client] flink info commands support dynamic properties.

2022-10-25 Thread GitBox


liuyongvs commented on PR #21145:
URL: https://github.com/apache/flink/pull/21145#issuecomment-1291448835

   hi @wangyang0918 added 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29757) ContinuousFileSplitEnumerator skip unprocessed splits when the file is splittable

2022-10-25 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624155#comment-17624155
 ] 

luoyuxia commented on FLINK-29757:
--

Thanks. I'll find some times to review it. 

> ContinuousFileSplitEnumerator skip unprocessed splits when the file is 
> splittable
> -
>
> Key: FLINK-29757
> URL: https://issues.apache.org/jira/browse/FLINK-29757
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Hanley Yang
>Priority: Critical
>  Labels: pull-request-available
>
> ContinuousFileSplitEnumerator use a HashSet to store processed splits. 
> This works fine when process a file as a single split, once the file is 
> splittable it will make unprocessed splits skipped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-20578) Cannot create empty array using ARRAY[]

2022-10-25 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624154#comment-17624154
 ] 

Eric Xiao edited comment on FLINK-20578 at 10/26/22 3:04 AM:
-

Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue, I noticed it has been a year since your last comment.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I tested a similar query in Trino (Presto) and BigQuery and they by 
default use Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!
!Screen Shot 2022-10-25 at 10.50.47 PM.png! 
!Screen Shot 2022-10-25 at 11.01.06 PM.png! 



was (Author: JIRAUSER295489):
Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue, I noticed it has been a year since your last comment.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I tested a similar query in Trino (Presto) and BigQuery and they by 
default use Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!


> Cannot create empty array using ARRAY[]
> ---
>
> Key: FLINK-20578
> URL: https://issues.apache.org/jira/browse/FLINK-20578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.17.0
>
> Attachments: Screen Shot 2022-10-25 at 10.50.42 PM.png, Screen Shot 
> 2022-10-25 at 10.50.47 PM.png, Screen Shot 2022-10-25 at 11.01.06 PM.png
>
>
> Calling the ARRAY function without an element (`ARRAY[]`) results in an error 
> message.
> Is that the expected behavior?
> How can users create empty arrays?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20578) Cannot create empty array using ARRAY[]

2022-10-25 Thread Eric Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624154#comment-17624154
 ] 

Eric Xiao commented on FLINK-20578:
---

Hi I wanted to get more involved in contributing to the Flink project and found 
this starter task - my team is working with the Table / SQL APIs, so I thought 
this would be a good beginning task to work on :). [~surahman] are you still 
working on this issue, I noticed it has been a year since your last comment.

> If Flink support empty array, which data type of elements in array should be 
> ? Does it cause new problems.
[~pensz] I tested a similar query in Trino (Presto) and BigQuery and they by 
default use Integer as the data type. This could be a good default behaviour?

!Screen Shot 2022-10-25 at 10.50.42 PM.png!


> Cannot create empty array using ARRAY[]
> ---
>
> Key: FLINK-20578
> URL: https://issues.apache.org/jira/browse/FLINK-20578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.17.0
>
> Attachments: Screen Shot 2022-10-25 at 10.50.42 PM.png, Screen Shot 
> 2022-10-25 at 10.50.47 PM.png, Screen Shot 2022-10-25 at 11.01.06 PM.png
>
>
> Calling the ARRAY function without an element (`ARRAY[]`) results in an error 
> message.
> Is that the expected behavior?
> How can users create empty arrays?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-20578) Cannot create empty array using ARRAY[]

2022-10-25 Thread Eric Xiao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eric Xiao updated FLINK-20578:
--
Attachment: Screen Shot 2022-10-25 at 10.50.42 PM.png
Screen Shot 2022-10-25 at 10.50.47 PM.png
Screen Shot 2022-10-25 at 11.01.06 PM.png

> Cannot create empty array using ARRAY[]
> ---
>
> Key: FLINK-20578
> URL: https://issues.apache.org/jira/browse/FLINK-20578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.11.2
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.17.0
>
> Attachments: Screen Shot 2022-10-25 at 10.50.42 PM.png, Screen Shot 
> 2022-10-25 at 10.50.47 PM.png, Screen Shot 2022-10-25 at 11.01.06 PM.png
>
>
> Calling the ARRAY function without an element (`ARRAY[]`) results in an error 
> message.
> Is that the expected behavior?
> How can users create empty arrays?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21155: [FLINK-29757][connector-files]ContinuousFileSplitEnumerator skip unprocessed splits when the file is splittable

2022-10-25 Thread GitBox


flinkbot commented on PR #21155:
URL: https://github.com/apache/flink/pull/21155#issuecomment-1291431552

   
   ## CI report:
   
   * bfa38298129a17c0aeea155e4275ba87b18d1cb7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29757) ContinuousFileSplitEnumerator skip unprocessed splits when the file is splittable

2022-10-25 Thread Hanley Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624152#comment-17624152
 ] 

Hanley Yang commented on FLINK-29757:
-

[~martijnvisser] [~luoyuxia]  I think we should use a combined string of path, 
offset and length instead of just path to track processed splits. I created a 
pull request for this, could you have a look at it? Thanks.

> ContinuousFileSplitEnumerator skip unprocessed splits when the file is 
> splittable
> -
>
> Key: FLINK-29757
> URL: https://issues.apache.org/jira/browse/FLINK-29757
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Hanley Yang
>Priority: Critical
>  Labels: pull-request-available
>
> ContinuousFileSplitEnumerator use a HashSet to store processed splits. 
> This works fine when process a file as a single split, once the file is 
> splittable it will make unprocessed splits skipped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29748) Expose the optimize phase in the connector context

2022-10-25 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624151#comment-17624151
 ] 

Aitozi commented on FLINK-29748:


My connector has implemented the \{{SupportsPartitionPushDown}}. But, due to 
the \{{PartitionPushDown}} optimization, the effective partition count can be 
different from the original source table. If I want to validate the source do 
not consume too much partition, from the connector's perspective, It do not 
know whether the optimization is finished, so it do not know when to apply the 
validation on the final optimization results.

> Expose the optimize phase in the connector context
> --
>
> Key: FLINK-29748
> URL: https://issues.apache.org/jira/browse/FLINK-29748
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Aitozi
>Priority: Minor
>
> Currently, in the connector it can not know whether the whole optimize is 
> finished.
> When the optimize finished, the all information is static, eg: the reading 
> partitions. If I want to validate the final optimized result (like whether 
> the reading partition is too much or empty), it needs the context of what is 
> the current phase. I think the {{ScanContext}} is ok to expose this 
> information. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

2022-10-25 Thread GitBox


reswqa commented on code in PR #21122:
URL: https://github.com/apache/flink/pull/21122#discussion_r1005162433


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java:
##
@@ -153,14 +153,18 @@ public synchronized void run() {
 
 /** This method only called by result partition to create 
subpartitionFileReader. */
 public HsDataView registerNewSubpartition(
-int subpartitionId, HsSubpartitionViewInternalOperations 
operation) throws IOException {
+int subpartitionId,

Review Comment:
   Rename `HsSubpartitionView ` to `HsSubpartitionConsumer`, 
`HsSubpartitionViewInternalOperations ` to 
`HsSubpartitionConsumerInternalOperations `, `HsSubpartitionMemoryDataConsumer` 
to `HsSubpartitionConsumerMemoryDataManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29757) ContinuousFileSplitEnumerator skip unprocessed splits when the file is splittable

2022-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29757:
---
Labels: pull-request-available  (was: )

> ContinuousFileSplitEnumerator skip unprocessed splits when the file is 
> splittable
> -
>
> Key: FLINK-29757
> URL: https://issues.apache.org/jira/browse/FLINK-29757
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Hanley Yang
>Priority: Critical
>  Labels: pull-request-available
>
> ContinuousFileSplitEnumerator use a HashSet to store processed splits. 
> This works fine when process a file as a single split, once the file is 
> splittable it will make unprocessed splits skipped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] HanleyYang opened a new pull request, #21155: [FLINK-29757][connector-files]ContinuousFileSplitEnumerator skip unprocessed splits when the file is splittable

2022-10-25 Thread GitBox


HanleyYang opened a new pull request, #21155:
URL: https://github.com/apache/flink/pull/21155

   
   
   ## What is the purpose of the change
   
   ContinuousFileSplitEnumerator use a HashSet to store processed splits. 
This works fine when process a file as a single split, once the file is 
splittable it will make unprocessed splits skipped. I think we should use a 
combined string of path, offset and length instead of just path to track 
processed splits.
   
   
   ## Brief change log
   
 -  Use a combined string of path, offset and length instead of just path 
to track processed splits.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change added tests and can be verified as follows:
   
 - *Added a unit test for a single file with multiple splits to track*

   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29134) fetch metrics may cause oom(ThreadPool task pile up)

2022-10-25 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-29134.

Fix Version/s: 1.17.0
   1.15.3
   1.16.1
   Resolution: Fixed

- master (1.17): 93af1e45a1e868ad3752923453562861d831104f
- release-1.16: c52e1519b509b6a918df037fe3665872ed8146fb
- release-1.15: eeda260d71ec11476edf532c5ff655828b0c4c7f

> fetch metrics may cause oom(ThreadPool task pile up)
> 
>
> Key: FLINK-29134
> URL: https://issues.apache.org/jira/browse/FLINK-29134
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.11.0, 1.15.2
>Reporter: Sitan Pang
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
> Attachments: dump-queueTask.png, dump-threadPool.png
>
>
> When we queryMetrics we use thread pool to process the data which are 
> returned by TMs. 
> {code:java}
> private void queryMetrics(final MetricQueryServiceGateway 
> queryServiceGateway) {
> LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
> queryServiceGateway
> .queryMetrics(timeout)
> .whenCompleteAsync(
> (MetricDumpSerialization.MetricSerializationResult 
> result, Throwable t) -> {
> if (t != null) {
> LOG.debug("Fetching metrics failed.", t);
> } else {
> metrics.addAll(deserializer.deserialize(result));
> }
> },
> executor);
> } {code}
> The only condition we will fetch metrics is update time is larger than 
> updateInterval
> {code:java}
> public void update() {
> synchronized (this) {
> long currentTime = System.currentTimeMillis();
> if (currentTime - lastUpdateTime > updateInterval) {
> lastUpdateTime = currentTime;
> fetchMetrics();
> }
> }
> } {code}
> Therefore, if we could not process the data in update-interval-time, metrics 
> data will accumulate.
> Besides, webMonitorEndpoint, restHandlers and metrics share the thread pool. 
> When we open ui, it maybe even worse.
> {code:java}
> final ScheduledExecutorService executor =
> WebMonitorEndpoint.createExecutorService(
> configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
> configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
> "DispatcherRestEndpoint");
> final long updateInterval =
> configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
> final MetricFetcher metricFetcher =
> updateInterval == 0
> ? VoidMetricFetcher.INSTANCE
> : MetricFetcherImpl.fromConfiguration(
> configuration,
> metricQueryServiceRetriever,
> dispatcherGatewayRetriever,
> executor);
> webMonitorEndpoint =
> restEndpointFactory.createRestEndpoint(
> configuration,
> dispatcherGatewayRetriever,
> resourceManagerGatewayRetriever,
> blobServer,
> executor,
> metricFetcher,
> 
> highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
> fatalErrorHandler); {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29748) Expose the optimize phase in the connector context

2022-10-25 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624149#comment-17624149
 ] 

Shengkai Fang commented on FLINK-29748:
---

If your connector implements `SupportsPartitionPushDown`,  the planner will 
tell you which partition needs to be read.

> Expose the optimize phase in the connector context
> --
>
> Key: FLINK-29748
> URL: https://issues.apache.org/jira/browse/FLINK-29748
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Aitozi
>Priority: Minor
>
> Currently, in the connector it can not know whether the whole optimize is 
> finished.
> When the optimize finished, the all information is static, eg: the reading 
> partitions. If I want to validate the final optimized result (like whether 
> the reading partition is too much or empty), it needs the context of what is 
> the current phase. I think the {{ScanContext}} is ok to expose this 
> information. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #21132: [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources

2022-10-25 Thread GitBox


xintongsong closed pull request #21132: [FLINK-29134][metrics] Do not 
repeatedly add useless metric updating tasks to avoid wasting resources
URL: https://github.com/apache/flink/pull/21132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Shenjiaqi commented on pull request #20860: [FLINK-29347] [runtime] Fix ByteStreamStateHandle#read return -1 when read count is 0

2022-10-25 Thread GitBox


Shenjiaqi commented on PR #20860:
URL: https://github.com/apache/flink/pull/20860#issuecomment-1291417994

   > Thanks for the update, please resolve the broken CI due to compiling 
problem.
   
   Thanks for review. Now code is fixed and CI passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29252) Support create table-store table with 'connector'='table-store'

2022-10-25 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29252.

Fix Version/s: table-store-0.3.0
 Assignee: Jingsong Lee  (was: MOBIN)
   Resolution: Fixed

master: bb411ab9d5ffdf11cc071353e2fbc1e184327085

> Support create table-store table with 'connector'='table-store'
> ---
>
> Key: FLINK-29252
> URL: https://issues.apache.org/jira/browse/FLINK-29252
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Affects Versions: table-store-0.3.0
>Reporter: MOBIN
>Assignee: Jingsong Lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> Support create table-store table with 'connector'='table-store': 
> sink to table-store:
> {code:java}
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TEMPORARY TABLE word_table (
> word STRING
> ) WITH (
> 'connector' = 'datagen',
> 'fields.word.length' = '1'
> );
> CREATE TABLE word_count (
> word STRING PRIMARY KEY NOT ENFORCED,
> cnt BIGINT
> ) WITH(
>   'connector' = 'table-store',
>   'catalog-name' = 'test-catalog',
>   'default-database' = 'test-db',  //should rename 'catalog-database'?
>   'catalog-table' = 'test-tb',
>   'warehouse'='file:/tmp/table_store'
> );
> INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word; 
> {code}
> source from table-store:
> {code:java}
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count (
> word STRING PRIMARY KEY NOT ENFORCED,
> cnt BIGINT
> ) WITH(
>   'connector' = 'table-store',
>   'catalog-name' = 'test-catalog',
>   'default-database' = 'test-db',
>   'catalog-table' = 'test-tb',
>   'warehouse'='file:/tmp/table_store'
> );
> CREATE TEMPORARY TABLE word_table (
> word STRING
> ) WITH (
> 'connector' = 'print'
> );
> INSERT INTO word_table SELECT word FROM word_count;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #327: [FLINK-29252] Support create table-store table with 'connector'='table-store'

2022-10-25 Thread GitBox


JingsongLi merged PR #327:
URL: https://github.com/apache/flink-table-store/pull/327


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29294) Introduce a CompactRewriter for full compaction

2022-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29294:
---
Labels: pull-request-available  (was: )

> Introduce a CompactRewriter for full compaction
> ---
>
> Key: FLINK-29294
> URL: https://issues.apache.org/jira/browse/FLINK-29294
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>
> We need to introduce a special {{CompactRewriter}} for full compaction to 
> write changelog files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper opened a new pull request, #332: [FLINK-29294] Introduce a CompactRewriter for full compaction

2022-10-25 Thread GitBox


tsreaper opened a new pull request, #332:
URL: https://github.com/apache/flink-table-store/pull/332

   We need to introduce a special `CompactRewriter` for full compaction to 
write changelog files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29294) Introduce a CompactRewriter for full compaction

2022-10-25 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-29294:

Description: We need to introduce a special {{CompactRewriter}} for full 
compaction to write changelog files.  (was: We need to introduce a special 
{{CompactTask}} for full compaction to write changelog files.)

> Introduce a CompactRewriter for full compaction
> ---
>
> Key: FLINK-29294
> URL: https://issues.apache.org/jira/browse/FLINK-29294
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
> Fix For: table-store-0.3.0
>
>
> We need to introduce a special {{CompactRewriter}} for full compaction to 
> write changelog files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29759) Cast type in LEFT JOIN

2022-10-25 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-29759:
--
Component/s: Table SQL / Planner
 (was: API / Core)

> Cast type in LEFT JOIN
> --
>
> Key: FLINK-29759
> URL: https://issues.apache.org/jira/browse/FLINK-29759
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.5
>Reporter: Alexandre Decuq
>Priority: Critical
>
> Hello,
> I would like to use LEFT JOIN in order to implement a non blocking join 
> without two tables (relationship is optional).
> There is a specificity: key on both side has not the same type (STRING vs 
> INT).
> Here a snap a code:
> Avro input:
>  
> {code:java}
> {
>   "name": "CompanyBankAccountMessage",
>   "type": "record",
>   "namespace": "com.kyriba.dataproduct.core.model.input",
>   "fields": [ 
> {
>   "name": "data",
>   "type": {
> "fields": [
>   {
> "name": "CURRENCY_ID",
> "type": [
>   "null",
>   "string"
> ],
> "default": null,
>   },
>   ...
> ]
>   }
> }
>   ]
> }{code}
>  
> Avro output:
>  
> {code:java}
> {
>   "name": "CurrencyMessage",
>   "type": "record",
>   "namespace": "com.kyriba.dataproduct.core.model.input", 
>   "fields": [
> {
>   "name": "data",
>   "type": {
> "fields": [
>   {
> "name": "CURRENCY_ID",
> "type": "int"
>   },
>   ...
> ]
>   }
> }
>   ]
> }{code}
>  
> Sql query:
>  
> {code:java}
> SELECT ...
> FROM `my.input.COMPANY_BANK_ACCOUNT.v1.avro` as COMPANY_BANK_ACCOUNT
> LEFT JOIN `my.input.CURRENCY.v1.avro` as CURRENCY
> ON CAST(COMPANY_BANK_ACCOUNT.CURRENCY_ID as INT) = CURRENCY.CURRENCY_ID{code}
> I got this exception:
>  
>  
> {code:java}
> Conversion to relational algebra failed to preserve datatypes:
> validated type:
>   RecordType(BIGINT currencyUid, ...)
> converted type: 
>   RecordType(BIGINT currencyUid NOT NULL, ...)
> rel:
>   LogicalProject(currencyUid=[CAST($116.CURRENCY_ID):BIGINT NOT NULL], ...)
> LogicalJoin(condition=[=($11, $117)], joinType=[left])
>       LogicalTableScan(table=[[data-platform, core, 
> kyriba.flink-sql-test.core.cdc.COMPANY_BANK_ACCOUNT.v1.avro]])
>       LogicalProject(the_port_key=[$0], data=[$1], $f2=[$1.CURRENCY_ID])
>         LogicalTableScan(table=[[data-platform, core, 
> kyriba.flink-sql-test.core.cdc.CURRENCY.v1.avro]])
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467){code}
> Did I make something wrong or this is a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29737) Support DataGen on waveform function

2022-10-25 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624134#comment-17624134
 ] 

Shengkai Fang commented on FLINK-29737:
---

Sure. I have added this in my todo list. I will take a look when I am free.

> Support DataGen on waveform function
> 
>
> Key: FLINK-29737
> URL: https://issues.apache.org/jira/browse/FLINK-29737
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: chenzihao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-24-16-09-47-386.png, 
> image-2022-10-24-16-09-52-410.png
>
>
> In some scenarios, we need to simulate flow changes in the production 
> environment. The current DATAGEN feature only supports data generation at a 
> constant rate. We try to simulate increments of flow using batch jobs, but 
> the production rate is not smooth, so I suggest that we can support sin-based 
> data generation in order to get smooth changes. 
> 1. add another batch job to simulate increments of flow.
> !image-2022-10-24-16-09-52-410.png!
> 2. sin-based.
> !image-2022-10-24-16-09-47-386.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] simplejason commented on a diff in pull request #21144: [FLINK-29747] refactor: module-based app to standalone components

2022-10-25 Thread GitBox


simplejason commented on code in PR #21144:
URL: https://github.com/apache/flink/pull/21144#discussion_r1005140544


##
flink-runtime-web/web-dashboard/src/app/components/share.module.ts:
##
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-import { CommonModule } from '@angular/common';
 import { NgModule } from '@angular/core';
 
-import { AutoResizeDirective } from 
'@flink-runtime-web/share/common/editor/auto-resize.directive';
-
 @NgModule({
-  declarations: [AutoResizeDirective],
-  exports: [AutoResizeDirective],
-  imports: [CommonModule]
+  imports: [],
+  declarations: [],
+  exports: []
 })
-export class EditorModule {}
+export class ShareModule {}

Review Comment:
   nit: Needs to be deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29271) Change to byte array from bytebuffer to improve performance when reading parquet file

2022-10-25 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-29271.
---
Fix Version/s: 1.17.0
   Resolution: Fixed

Fixed in master: be46408c0241570647bfef58f7dbe5336adfa637

> Change to byte array from bytebuffer to improve performance when reading 
> parquet file
> -
>
> Key: FLINK-29271
> URL: https://issues.apache.org/jira/browse/FLINK-29271
> Project: Flink
>  Issue Type: Improvement
>Reporter: jiangjiguang0719
>Assignee: jiangjiguang0719
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> I have done a test to compare byte array and bytebuffer,  40% performance 
> improvement when using byte array to read parquet file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29271) Change to byte array from bytebuffer to improve performance when reading parquet file

2022-10-25 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-29271:

Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile)

> Change to byte array from bytebuffer to improve performance when reading 
> parquet file
> -
>
> Key: FLINK-29271
> URL: https://issues.apache.org/jira/browse/FLINK-29271
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: jiangjiguang0719
>Assignee: jiangjiguang0719
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> I have done a test to compare byte array and bytebuffer,  40% performance 
> improvement when using byte array to read parquet file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] saLeox commented on pull request #21149: [FLINK-29527][format] Make unknownFieldsIndices work for single ParquetReader

2022-10-25 Thread GitBox


saLeox commented on PR #21149:
URL: https://github.com/apache/flink/pull/21149#issuecomment-1291388833

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wuchong merged pull request #20830: [FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers

2022-10-25 Thread GitBox


wuchong merged PR #20830:
URL: https://github.com/apache/flink/pull/20830


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29759) Cast type in LEFT JOIN

2022-10-25 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624127#comment-17624127
 ] 

luoyuxia commented on FLINK-29759:
--

[~adecuq] Does the key on both side has the same nullabity (STRING vs INT)? If 
not, you can make them has same nullabity and try again.

> Cast type in LEFT JOIN
> --
>
> Key: FLINK-29759
> URL: https://issues.apache.org/jira/browse/FLINK-29759
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Alexandre Decuq
>Priority: Critical
>
> Hello,
> I would like to use LEFT JOIN in order to implement a non blocking join 
> without two tables (relationship is optional).
> There is a specificity: key on both side has not the same type (STRING vs 
> INT).
> Here a snap a code:
> Avro input:
>  
> {code:java}
> {
>   "name": "CompanyBankAccountMessage",
>   "type": "record",
>   "namespace": "com.kyriba.dataproduct.core.model.input",
>   "fields": [ 
> {
>   "name": "data",
>   "type": {
> "fields": [
>   {
> "name": "CURRENCY_ID",
> "type": [
>   "null",
>   "string"
> ],
> "default": null,
>   },
>   ...
> ]
>   }
> }
>   ]
> }{code}
>  
> Avro output:
>  
> {code:java}
> {
>   "name": "CurrencyMessage",
>   "type": "record",
>   "namespace": "com.kyriba.dataproduct.core.model.input", 
>   "fields": [
> {
>   "name": "data",
>   "type": {
> "fields": [
>   {
> "name": "CURRENCY_ID",
> "type": "int"
>   },
>   ...
> ]
>   }
> }
>   ]
> }{code}
>  
> Sql query:
>  
> {code:java}
> SELECT ...
> FROM `my.input.COMPANY_BANK_ACCOUNT.v1.avro` as COMPANY_BANK_ACCOUNT
> LEFT JOIN `my.input.CURRENCY.v1.avro` as CURRENCY
> ON CAST(COMPANY_BANK_ACCOUNT.CURRENCY_ID as INT) = CURRENCY.CURRENCY_ID{code}
> I got this exception:
>  
>  
> {code:java}
> Conversion to relational algebra failed to preserve datatypes:
> validated type:
>   RecordType(BIGINT currencyUid, ...)
> converted type: 
>   RecordType(BIGINT currencyUid NOT NULL, ...)
> rel:
>   LogicalProject(currencyUid=[CAST($116.CURRENCY_ID):BIGINT NOT NULL], ...)
> LogicalJoin(condition=[=($11, $117)], joinType=[left])
>       LogicalTableScan(table=[[data-platform, core, 
> kyriba.flink-sql-test.core.cdc.COMPANY_BANK_ACCOUNT.v1.avro]])
>       LogicalProject(the_port_key=[$0], data=[$1], $f2=[$1.CURRENCY_ID])
>         LogicalTableScan(table=[[data-platform, core, 
> kyriba.flink-sql-test.core.cdc.CURRENCY.v1.avro]])
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467){code}
> Did I make something wrong or this is a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29757) ContinuousFileSplitEnumerator skip unprocessed splits when the file is splittable

2022-10-25 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624123#comment-17624123
 ] 

luoyuxia commented on FLINK-29757:
--

[~OlegPT] Thanks for reporting it. I think we may need use   to 
identify a split instead of single a path. What's you idea for fixing it ?

> ContinuousFileSplitEnumerator skip unprocessed splits when the file is 
> splittable
> -
>
> Key: FLINK-29757
> URL: https://issues.apache.org/jira/browse/FLINK-29757
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Hanley Yang
>Priority: Critical
>
> ContinuousFileSplitEnumerator use a HashSet to store processed splits. 
> This works fine when process a file as a single split, once the file is 
> splittable it will make unprocessed splits skipped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jiangjiguang commented on pull request #20830: [FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers

2022-10-25 Thread GitBox


jiangjiguang commented on PR #20830:
URL: https://github.com/apache/flink/pull/20830#issuecomment-1291378801

   @wuchong Could you please help merge this pr, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29435) securityConfiguration supports dynamic configuration

2022-10-25 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-29435.

Fix Version/s: 1.17.0
   Resolution: Done

master (1.17): a8c72ccb98e6eaa59d878d8182f0d1411bf577c9

> securityConfiguration supports dynamic configuration
> 
>
> Key: FLINK-29435
> URL: https://issues.apache.org/jira/browse/FLINK-29435
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Bo Cui
>Assignee: Bo Cui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> when different tenants submit jobs using the same _flink-conf.yaml_, the same 
> user is displayed on the Yarn page. 
> _SecurityConfiguration_ does not support dynamic configuration. Therefore, 
> the user displayed on the Yarn page is the 
> _security.kerberos.login.principal_ in the _flink-conf.yaml_.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on pull request #21132: [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources

2022-10-25 Thread GitBox


xintongsong commented on PR #21132:
URL: https://github.com/apache/flink/pull/21132#issuecomment-1291373055

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong closed pull request #20910: [FLINK-29435]SecurityConfiguration supports dynamic configuration

2022-10-25 Thread GitBox


xintongsong closed pull request #20910: [FLINK-29435]SecurityConfiguration 
supports dynamic configuration
URL: https://github.com/apache/flink/pull/20910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] waywtdcc commented on pull request #21078: [FLINK-28985][planner]sql like statement supports views

2022-10-25 Thread GitBox


waywtdcc commented on PR #21078:
URL: https://github.com/apache/flink/pull/21078#issuecomment-1291278156

   @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21154: [hotfix] Add icon for Flink in IntellijIdea and Toolbox

2022-10-25 Thread GitBox


flinkbot commented on PR #21154:
URL: https://github.com/apache/flink/pull/21154#issuecomment-1291254943

   
   ## CI report:
   
   * 7a4b9e4cebdd10a66633bbaa36d05b0e936bf7b5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #21150: [hotfix] Replace deprecated DescribeTopicsResult#all, KafkaConsumer#poll and ContainerState#getContainerIpAddress

2022-10-25 Thread GitBox


snuyanzin commented on PR #21150:
URL: https://github.com/apache/flink/pull/21150#issuecomment-1291251696

   
   @flinkbot run azure
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21153: [hotfix][runtime] Remove private methods

2022-10-25 Thread GitBox


flinkbot commented on PR #21153:
URL: https://github.com/apache/flink/pull/21153#issuecomment-1291251114

   
   ## CI report:
   
   * 785993706ae3d4a896b41c4c3fe366da6f9e841a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin opened a new pull request, #21154: [hotfix] Add icon for Flink in IntellijIdea and Toolbox

2022-10-25 Thread GitBox


snuyanzin opened a new pull request, #21154:
URL: https://github.com/apache/flink/pull/21154

   
   ## What is the purpose of the change
   The PR adds icon (was taken from https://flink.apache.org/material.html) 
which will be shown in Intellijidea toolbox or Intellijidea recent projects like
   ![Screenshot from 2022-10-26 
01-30-48](https://user-images.githubusercontent.com/403174/197901205-5a921684-fb28-4676-b6a1-b646b3b645ae.png)
   and 
   ![Screenshot from 2022-10-26 
01-34-47](https://user-images.githubusercontent.com/403174/197901367-d5013278-4ba8-47c1-ab56-a75186fada3b.png)
   
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive): ( no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no )
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin opened a new pull request, #21153: [hotfix][runtime] Remove private methods

2022-10-25 Thread GitBox


snuyanzin opened a new pull request, #21153:
URL: https://github.com/apache/flink/pull/21153

   ## What is the purpose of the change
   
   The PR removes a bunch of unused private methods
   
_org.apache.flink.runtime.operators.hash.CompactingHashTable#insertBucketEntryFromSearch_
 which became unused
   after 
https://github.com/apache/flink/commit/925ac1f76bb84986764495407049a77552169d84
   
   _org.apache.flink.runtime.state.VoidNamespace#readResolve_
   was added here 
https://github.com/apache/flink/commit/3b97128f05bacfb80afe4a2a49741c31ff306cd2 
as unused
   
   
_org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerTest#createDispatcherGateway_
 that became unused after 
https://github.com/apache/flink/commit/401f5f3970c6e969f3749ee72a4c57cb3f905af2
   
   
_org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolServiceTest#matchesWithSlotContext_
 which became unused after 
https://github.com/apache/flink/commit/b67f0a70662819e064176a418ce2b892f3fb61b4
   
   _org.apache.flink.runtime.messages.WebMonitorMessagesTest#randomIds_ which 
became unused
   after 
https://github.com/apache/flink/commit/67aad88ee025ce02053ab560f2504762f53b87d9
   
   
_org.apache.flink.runtime.scheduler.adaptive.FinishedTest.MockFinishedContext#assertNoStateTransition_
   becoming unused after 
https://github.com/apache/flink/commit/85879bfb4e6be34481aa94bd0c215db234283ccd
   
   
_org.apache.flink.runtime.scheduler.adaptive.StopWithSavepointTest#createStopWithSavepoint_
   which became after 
https://github.com/apache/flink/commit/d13cb056912d9011df96671c3bd60299a59a1117
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (yes)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21152: [hotfix][connectors][kafka] Remove unused private classes and methods

2022-10-25 Thread GitBox


flinkbot commented on PR #21152:
URL: https://github.com/apache/flink/pull/21152#issuecomment-1291225511

   
   ## CI report:
   
   * 4fe0f4011bf594f05f839b1ad20d80281553eb76 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin opened a new pull request, #21152: [hotfix][connectors][kafka] Remove unused private classes and methods

2022-10-25 Thread GitBox


snuyanzin opened a new pull request, #21152:
URL: https://github.com/apache/flink/pull/21152

   
   ## What is the purpose of the change
   Remove unused private method/classes
   _org.apache.flink.connector.kafka.sink.KafkaSinkITCase#createTestConsumer_ 
became unused after
   
https://github.com/apache/flink/commit/ade011ec139f8258db356abd65e6e83601cd22e1
   
   
_org.apache.flink.connector.kafka.source.enumerator.KafkaEnumeratorTest.BlockingClosingContext_
 is unused since beginning
   
   
_org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.BrokerRestartingMapper_
 became unused after 
https://github.com/apache/flink/commit/5f5cab2f9a1175fa396833299c313c2de6d1dae3
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): ( no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #21150: [hotfix] Replace deprecated DescribeTopicsResult#all, KafkaConsumer#poll and ContainerState#getContainerIpAddress

2022-10-25 Thread GitBox


snuyanzin commented on PR #21150:
URL: https://github.com/apache/flink/pull/21150#issuecomment-1291173829

   
   @flinkbot run azure
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] xccui opened a new pull request, #412: [hotfix] Set the default helm image repo

2022-10-25 Thread GitBox


xccui opened a new pull request, #412:
URL: https://github.com/apache/flink-kubernetes-operator/pull/412

   
   
   ## What is the purpose of the change
   
   Not sure if this is intended, but the old default value 
`flink-kubernetes-operator` wasn't a valid image repository. Users have to 
override it when using the helm in the git repo.
   
   ## Brief change log
   
   Replace the default value with a valid helm image repository 
`apache/flink-kubernetes-operator`
   
   ## Verifying this change
   
   Verified with a local Argo CD setup 
(`apache/flink-kubernetes-operator:latest`).
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(no)
 - Core observer or reconciler logic that is regularly executed: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624029#comment-17624029
 ] 

Gyula Fora commented on FLINK-29762:


I think you should be able to set up your Beam job as Flink Application. That 
is a pre-requisite for the reactive mode. I am not aware of any plans for 
supporting reactive mode for session clusters. Thre problem is if there are 
multiple jobs they would be competing for resources.

> Can not create a standalone cluster with reactive mode using the operator
> -
>
> Key: FLINK-29762
> URL: https://issues.apache.org/jira/browse/FLINK-29762
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: Kubernetes Version 1.22 on EKS.
> Flink Operator veresion 1.2.0
> Flink Veresion 1.15 (errors in 1.14 too)
>Reporter: yuvipanda
>Priority: Major
>
> I'm trying to create a minimal running flink cluster with reactive scaling 
> using the kubernetes operator (running v1.2.0), with the following YAML:
>  
> {{
> kind: FlinkDeployment
> metadata:
>   name: test-flink-cluster
> spec:
>   flinkConfiguration:
>     scheduler-mode: reactive
>   flinkVersion: v1_15
>   image: flink:1.15
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m
>   mode: standalone
>   serviceAccount: flink
>   taskManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m}}
>  
> However, this causes the jobmanager to crash with the following:
>  
> {{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
> system}}
> {{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
> system}}
> {{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
> file system}}
> {{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
> Read-only file system}}
> {{Starting Job Manager}}
> {{Starting standalonesession as a console application on host 
> test-flink-cluster-58cd584fdd-xwbtf.}}
> {{2022-10-25 18:32:00,422 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
> }}
> {{2022-10-25 18:32:00,510 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
> Preconfiguration: }}
> {{2022-10-25 18:32:00,512 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}
> {{RESOURCE_PARAMS extraction logs:}}
> {{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
> {{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=469762048b -D 
> jobmanager.memory.jvm-overhead.max=201326592b}}
> {{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
> will impact performance.}}
> {{INFO  [] - Loading configuration property: blob.server.port, 6124}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:1}}
> {{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 
> 1}}
> {{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.port", ""}}
> {{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> test-flink-cluster.default}}
> {{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
> {{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
> {{INFO  [] - Loading configuration property: kubernetes.service-account, 
> flink}}
> {{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
> test-flink-cluster}}
> {{INFO  [] - Loading configuration property: kubernetes.container.image, 
> flink:1.15}}
> {{INFO  [] - Loading configuration property: parallelism.default, 2}}
> {{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
> {{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 
> 2}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.rest-service.exposed.type, ClusterIP}}
> {{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
> {{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.class", 
> "org.apache.flink.metrics.prometheus.PrometheusReporter"}}
> {{INFO  [] - Loading configuration property: web.cancel.enable, false}}
> {{INFO  [] - Loading configuration property: execution.target, remote}}
> {{INFO  [] - Loading 

[jira] [Commented] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread yuvipanda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624014#comment-17624014
 ] 

yuvipanda commented on FLINK-29762:
---

So if I understand this correctly, we can't actually use Reactive Mode with 
Apache Beam on Flink until Standalone Session clusters are also supported?

> Can not create a standalone cluster with reactive mode using the operator
> -
>
> Key: FLINK-29762
> URL: https://issues.apache.org/jira/browse/FLINK-29762
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: Kubernetes Version 1.22 on EKS.
> Flink Operator veresion 1.2.0
> Flink Veresion 1.15 (errors in 1.14 too)
>Reporter: yuvipanda
>Priority: Major
>
> I'm trying to create a minimal running flink cluster with reactive scaling 
> using the kubernetes operator (running v1.2.0), with the following YAML:
>  
> {{
> kind: FlinkDeployment
> metadata:
>   name: test-flink-cluster
> spec:
>   flinkConfiguration:
>     scheduler-mode: reactive
>   flinkVersion: v1_15
>   image: flink:1.15
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m
>   mode: standalone
>   serviceAccount: flink
>   taskManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m}}
>  
> However, this causes the jobmanager to crash with the following:
>  
> {{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
> system}}
> {{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
> system}}
> {{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
> file system}}
> {{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
> Read-only file system}}
> {{Starting Job Manager}}
> {{Starting standalonesession as a console application on host 
> test-flink-cluster-58cd584fdd-xwbtf.}}
> {{2022-10-25 18:32:00,422 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
> }}
> {{2022-10-25 18:32:00,510 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
> Preconfiguration: }}
> {{2022-10-25 18:32:00,512 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}
> {{RESOURCE_PARAMS extraction logs:}}
> {{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
> {{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=469762048b -D 
> jobmanager.memory.jvm-overhead.max=201326592b}}
> {{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
> will impact performance.}}
> {{INFO  [] - Loading configuration property: blob.server.port, 6124}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:1}}
> {{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 
> 1}}
> {{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.port", ""}}
> {{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> test-flink-cluster.default}}
> {{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
> {{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
> {{INFO  [] - Loading configuration property: kubernetes.service-account, 
> flink}}
> {{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
> test-flink-cluster}}
> {{INFO  [] - Loading configuration property: kubernetes.container.image, 
> flink:1.15}}
> {{INFO  [] - Loading configuration property: parallelism.default, 2}}
> {{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
> {{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 
> 2}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.rest-service.exposed.type, ClusterIP}}
> {{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
> {{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.class", 
> "org.apache.flink.metrics.prometheus.PrometheusReporter"}}
> {{INFO  [] - Loading configuration property: web.cancel.enable, false}}
> {{INFO  [] - Loading configuration property: execution.target, remote}}
> {{INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 

[jira] [Commented] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread yuvipanda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624012#comment-17624012
 ] 

yuvipanda commented on FLINK-29762:
---

Oh, I see - I only saw the `standalone` part but not the `application` part. 
I'm trying to run an Apache Beam pipeline, and I'm not sure if that'll actually 
work if I had to specify a job during cluster creation.

> Can not create a standalone cluster with reactive mode using the operator
> -
>
> Key: FLINK-29762
> URL: https://issues.apache.org/jira/browse/FLINK-29762
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: Kubernetes Version 1.22 on EKS.
> Flink Operator veresion 1.2.0
> Flink Veresion 1.15 (errors in 1.14 too)
>Reporter: yuvipanda
>Priority: Major
>
> I'm trying to create a minimal running flink cluster with reactive scaling 
> using the kubernetes operator (running v1.2.0), with the following YAML:
>  
> {{
> kind: FlinkDeployment
> metadata:
>   name: test-flink-cluster
> spec:
>   flinkConfiguration:
>     scheduler-mode: reactive
>   flinkVersion: v1_15
>   image: flink:1.15
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m
>   mode: standalone
>   serviceAccount: flink
>   taskManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m}}
>  
> However, this causes the jobmanager to crash with the following:
>  
> {{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
> system}}
> {{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
> system}}
> {{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
> file system}}
> {{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
> Read-only file system}}
> {{Starting Job Manager}}
> {{Starting standalonesession as a console application on host 
> test-flink-cluster-58cd584fdd-xwbtf.}}
> {{2022-10-25 18:32:00,422 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
> }}
> {{2022-10-25 18:32:00,510 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
> Preconfiguration: }}
> {{2022-10-25 18:32:00,512 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}
> {{RESOURCE_PARAMS extraction logs:}}
> {{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
> {{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=469762048b -D 
> jobmanager.memory.jvm-overhead.max=201326592b}}
> {{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
> will impact performance.}}
> {{INFO  [] - Loading configuration property: blob.server.port, 6124}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:1}}
> {{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 
> 1}}
> {{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.port", ""}}
> {{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> test-flink-cluster.default}}
> {{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
> {{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
> {{INFO  [] - Loading configuration property: kubernetes.service-account, 
> flink}}
> {{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
> test-flink-cluster}}
> {{INFO  [] - Loading configuration property: kubernetes.container.image, 
> flink:1.15}}
> {{INFO  [] - Loading configuration property: parallelism.default, 2}}
> {{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
> {{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 
> 2}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.rest-service.exposed.type, ClusterIP}}
> {{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
> {{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.class", 
> "org.apache.flink.metrics.prometheus.PrometheusReporter"}}
> {{INFO  [] - Loading configuration property: web.cancel.enable, false}}
> {{INFO  [] - Loading configuration property: execution.target, remote}}
> {{INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
> 1024m}}
> 

[jira] [Assigned] (FLINK-29536) Add WATCH_NAMESPACES env var to kubernetes operator

2022-10-25 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-29536:
--

Assignee: Tony Garrard

> Add WATCH_NAMESPACES env var to kubernetes operator
> ---
>
> Key: FLINK-29536
> URL: https://issues.apache.org/jira/browse/FLINK-29536
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Tony Garrard
>Assignee: Tony Garrard
>Priority: Major
>
> Provide the ability to set the namespaces watched by the operator using an 
> env var. Whilst the additional config can still be used, the presence of the 
> env var will take priority.
>  
> Reasons for issue
>  # Operator will take effect of the setting immediately as pod will roll 
> (rather than waiting for the config to be refreshed)
>  # If the operator is to be olm bundled we will be able to set the target 
> namespace using the following 
> {{env:}}
>   {{  - name: WATCHED_NAMESPACE}}
>   {{valueFrom:}}
>   {{  fieldRef:}}
>  {{fieldPath: 
> metadata.annotations['olm.targetNamespaces']}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29762.
--
Resolution: Not A Problem

> Can not create a standalone cluster with reactive mode using the operator
> -
>
> Key: FLINK-29762
> URL: https://issues.apache.org/jira/browse/FLINK-29762
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: Kubernetes Version 1.22 on EKS.
> Flink Operator veresion 1.2.0
> Flink Veresion 1.15 (errors in 1.14 too)
>Reporter: yuvipanda
>Priority: Major
>
> I'm trying to create a minimal running flink cluster with reactive scaling 
> using the kubernetes operator (running v1.2.0), with the following YAML:
>  
> {{
> kind: FlinkDeployment
> metadata:
>   name: test-flink-cluster
> spec:
>   flinkConfiguration:
>     scheduler-mode: reactive
>   flinkVersion: v1_15
>   image: flink:1.15
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m
>   mode: standalone
>   serviceAccount: flink
>   taskManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m}}
>  
> However, this causes the jobmanager to crash with the following:
>  
> {{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
> system}}
> {{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
> system}}
> {{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
> file system}}
> {{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
> Read-only file system}}
> {{Starting Job Manager}}
> {{Starting standalonesession as a console application on host 
> test-flink-cluster-58cd584fdd-xwbtf.}}
> {{2022-10-25 18:32:00,422 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
> }}
> {{2022-10-25 18:32:00,510 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
> Preconfiguration: }}
> {{2022-10-25 18:32:00,512 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}
> {{RESOURCE_PARAMS extraction logs:}}
> {{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
> {{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=469762048b -D 
> jobmanager.memory.jvm-overhead.max=201326592b}}
> {{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
> will impact performance.}}
> {{INFO  [] - Loading configuration property: blob.server.port, 6124}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:1}}
> {{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 
> 1}}
> {{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.port", ""}}
> {{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> test-flink-cluster.default}}
> {{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
> {{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
> {{INFO  [] - Loading configuration property: kubernetes.service-account, 
> flink}}
> {{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
> test-flink-cluster}}
> {{INFO  [] - Loading configuration property: kubernetes.container.image, 
> flink:1.15}}
> {{INFO  [] - Loading configuration property: parallelism.default, 2}}
> {{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
> {{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 
> 2}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.rest-service.exposed.type, ClusterIP}}
> {{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
> {{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.class", 
> "org.apache.flink.metrics.prometheus.PrometheusReporter"}}
> {{INFO  [] - Loading configuration property: web.cancel.enable, false}}
> {{INFO  [] - Loading configuration property: execution.target, remote}}
> {{INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.internal.cluster-mode, SESSION}}
> {{INFO  [] - Loading configuration property: 

[jira] [Commented] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17624011#comment-17624011
 ] 

Gyula Fora commented on FLINK-29762:


As you can see from the error message: "reactive mode is only supported by 
standalone application clusters"

You forgot to specify the job spec in your FlinkDeployment yaml. You need that 
to create an Application cluster otherwise you get an empty Session cluster .

Only Application clusters support reactive mode

> Can not create a standalone cluster with reactive mode using the operator
> -
>
> Key: FLINK-29762
> URL: https://issues.apache.org/jira/browse/FLINK-29762
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: Kubernetes Version 1.22 on EKS.
> Flink Operator veresion 1.2.0
> Flink Veresion 1.15 (errors in 1.14 too)
>Reporter: yuvipanda
>Priority: Major
>
> I'm trying to create a minimal running flink cluster with reactive scaling 
> using the kubernetes operator (running v1.2.0), with the following YAML:
>  
> {{
> kind: FlinkDeployment
> metadata:
>   name: test-flink-cluster
> spec:
>   flinkConfiguration:
>     scheduler-mode: reactive
>   flinkVersion: v1_15
>   image: flink:1.15
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m
>   mode: standalone
>   serviceAccount: flink
>   taskManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m}}
>  
> However, this causes the jobmanager to crash with the following:
>  
> {{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
> system}}
> {{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
> system}}
> {{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
> file system}}
> {{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
> Read-only file system}}
> {{Starting Job Manager}}
> {{Starting standalonesession as a console application on host 
> test-flink-cluster-58cd584fdd-xwbtf.}}
> {{2022-10-25 18:32:00,422 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
> }}
> {{2022-10-25 18:32:00,510 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
> Preconfiguration: }}
> {{2022-10-25 18:32:00,512 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}
> {{RESOURCE_PARAMS extraction logs:}}
> {{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
> {{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=469762048b -D 
> jobmanager.memory.jvm-overhead.max=201326592b}}
> {{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
> will impact performance.}}
> {{INFO  [] - Loading configuration property: blob.server.port, 6124}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:1}}
> {{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 
> 1}}
> {{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.port", ""}}
> {{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> test-flink-cluster.default}}
> {{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
> {{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
> {{INFO  [] - Loading configuration property: kubernetes.service-account, 
> flink}}
> {{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
> test-flink-cluster}}
> {{INFO  [] - Loading configuration property: kubernetes.container.image, 
> flink:1.15}}
> {{INFO  [] - Loading configuration property: parallelism.default, 2}}
> {{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
> {{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 
> 2}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.rest-service.exposed.type, ClusterIP}}
> {{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
> {{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.class", 
> "org.apache.flink.metrics.prometheus.PrometheusReporter"}}
> {{INFO  [] - Loading configuration property: web.cancel.enable, false}}
> {{INFO  [] - Loading configuration property: 

[GitHub] [flink] MartijnVisser merged pull request #21143: [hotfix] Fix some typo in HiveOptions.

2022-10-25 Thread GitBox


MartijnVisser merged PR #21143:
URL: https://github.com/apache/flink/pull/21143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser commented on pull request #21048: [FLINK-29624][Common][Connector][Filesystem] Upgrade commons-lang3 to 3.12.0

2022-10-25 Thread GitBox


MartijnVisser commented on PR #21048:
URL: https://github.com/apache/flink/pull/21048#issuecomment-1291020321

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ferenc-csaky commented on a diff in pull request #21140: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

2022-10-25 Thread GitBox


ferenc-csaky commented on code in PR #21140:
URL: https://github.com/apache/flink/pull/21140#discussion_r1004859910


##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Delegation token provider implementation for HBase. Basically it would be 
good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the 
connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hbaseConf;
+
+@Override
+public String serviceName() {
+return "hbase";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hbaseConf = getHBaseConfiguration(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHBaseConfiguration(Configuration conf) {
+org.apache.hadoop.conf.Configuration hbaseConf = null;
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+// 
+// Intended call: HBaseConfiguration.create(conf);
+// HBaseConfiguration.create has been added to HBase in v0.90.0 so 
we can eliminate

Review Comment:
   Is it explicitly stated anywhere that Flink supports HBase 0.x? Furthermore 
there is an [ongoing 
discussion](https://lists.apache.org/thread/x7l2gj8g93r4v6x6953cyt6jrs8c4r1b) 
about dropping the explicit 1.x connector as well.



##
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Delegation token provider implementation for HBase. Basically it would be 
good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the 
connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+public class 

[jira] [Commented] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread yuvipanda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623986#comment-17623986
 ] 

yuvipanda commented on FLINK-29762:
---

Here is the jobmanager deployment created:

 

{{apiVersion: apps/v1}}
{{kind: Deployment}}
{{metadata:}}
{{  annotations:}}
{{    deployment.kubernetes.io/revision: "1"}}
{{    flinkdeployment.flink.apache.org/generation: "1"}}
{{  creationTimestamp: "2022-10-25T18:31:36Z"}}
{{  generation: 1}}
{{  labels:}}
{{    app: test-flink-cluster}}
{{    component: jobmanager}}
{{    type: flink-standalone-kubernetes}}
{{  name: test-flink-cluster}}
{{  namespace: default}}
{{  resourceVersion: "14946930"}}
{{  uid: 766cdb2c-5e2d-4b27-8694-cf8ae58ea084}}
{{spec:}}
{{  progressDeadlineSeconds: 600}}
{{  replicas: 1}}
{{  revisionHistoryLimit: 10}}
{{  selector:}}
{{    matchLabels:}}
{{      app: test-flink-cluster}}
{{      component: jobmanager}}
{{      type: flink-standalone-kubernetes}}
{{  strategy:}}
{{    rollingUpdate:}}
{{      maxSurge: 25%}}
{{      maxUnavailable: 25%}}
{{    type: RollingUpdate}}
{{  template:}}
{{    metadata:}}
{{      annotations:}}
{{        flinkdeployment.flink.apache.org/generation: "1"}}
{{      creationTimestamp: null}}
{{      labels:}}
{{        app: test-flink-cluster}}
{{        component: jobmanager}}
{{        type: flink-standalone-kubernetes}}
{{    spec:}}
{{      containers:}}
{{      - args:}}
{{        - jobmanager}}
{{        command:}}
{{        - /docker-entrypoint.sh}}
{{        env:}}
{{        - name: _POD_IP_ADDRESS}}
{{          valueFrom:}}
{{            fieldRef:}}
{{              apiVersion: v1}}
{{              fieldPath: status.podIP}}
{{        image: flink:1.15}}
{{        imagePullPolicy: IfNotPresent}}
{{        name: flink-main-container}}
{{        ports:}}
{{        - containerPort: 8081}}
{{          name: rest}}
{{          protocol: TCP}}
{{        - containerPort: 6123}}
{{          name: jobmanager-rpc}}
{{          protocol: TCP}}
{{        - containerPort: 6124}}
{{          name: blobserver}}
{{          protocol: TCP}}
{{        resources:}}
{{          limits:}}
{{            cpu: 200m}}
{{            memory: 1Gi}}
{{          requests:}}
{{            cpu: 200m}}
{{            memory: 1Gi}}
{{        terminationMessagePath: /dev/termination-log}}
{{        terminationMessagePolicy: File}}
{{        volumeMounts:}}
{{        - mountPath: /opt/flink/conf}}
{{          name: flink-config-volume}}
{{      dnsPolicy: ClusterFirst}}
{{      restartPolicy: Always}}
{{      schedulerName: default-scheduler}}
{{      securityContext: {}}}
{{      serviceAccount: flink}}
{{      serviceAccountName: flink}}
{{      terminationGracePeriodSeconds: 30}}
{{      volumes:}}
{{      - configMap:}}
{{          defaultMode: 420}}
{{          items:}}
{{          - key: log4j-console.properties}}
{{            path: log4j-console.properties}}
{{          - key: flink-conf.yaml}}
{{            path: flink-conf.yaml}}
{{          name: flink-config-test-flink-cluster}}
{{        name: flink-config-volume}}
{{status:}}
{{  conditions:}}
{{  - lastTransitionTime: "2022-10-25T18:31:36Z"}}
{{    lastUpdateTime: "2022-10-25T18:31:39Z"}}
{{    message: ReplicaSet "test-flink-cluster-58cd584fdd" has successfully 
progressed.}}
{{    reason: NewReplicaSetAvailable}}
{{    status: "True"}}
{{    type: Progressing}}
{{  - lastTransitionTime: "2022-10-25T18:45:34Z"}}
{{    lastUpdateTime: "2022-10-25T18:45:34Z"}}
{{    message: Deployment does not have minimum availability.}}
{{    reason: MinimumReplicasUnavailable}}
{{    status: "False"}}
{{    type: Available}}
{{  observedGeneration: 1}}
{{  replicas: 1}}
{{  unavailableReplicas: 1}}
{{  updatedReplicas: 1}}

 

and the configmap

 

{{apiVersion: v1}}
{{data:}}
{{  flink-conf.yaml: |}}
{{    blob.server.port: 6124}}
{{    kubernetes.jobmanager.annotations: 
flinkdeployment.flink.apache.org/generation:1}}
{{    kubernetes.jobmanager.replicas: 1}}
{{    scheduler-mode: reactive}}
{{    "kubernetes.operator.metrics.reporter.prom.port": ""}}
{{    jobmanager.rpc.address: test-flink-cluster.default}}
{{    kubernetes.taskmanager.cpu: 0.2}}
{{    "prometheus.io/port": ""}}
{{    kubernetes.service-account: flink}}
{{    kubernetes.cluster-id: test-flink-cluster}}
{{    kubernetes.container.image: flink:1.15}}
{{    parallelism.default: 2}}
{{    kubernetes.namespace: default}}
{{    taskmanager.numberOfTaskSlots: 2}}
{{    kubernetes.rest-service.exposed.type: ClusterIP}}
{{    "prometheus.io/scrape": "true"}}
{{    taskmanager.memory.process.size: 1024m}}
{{    "kubernetes.operator.metrics.reporter.prom.class": 
"org.apache.flink.metrics.prometheus.PrometheusReporter"}}
{{    web.cancel.enable: false}}
{{    execution.target: remote}}
{{    jobmanager.memory.process.size: 1024m}}
{{    taskmanager.rpc.port: 6122}}
{{    kubernetes.internal.cluster-mode: SESSION}}
{{    

[jira] [Created] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread yuvipanda (Jira)
yuvipanda created FLINK-29762:
-

 Summary: Can not create a standalone cluster with reactive mode 
using the operator
 Key: FLINK-29762
 URL: https://issues.apache.org/jira/browse/FLINK-29762
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
 Environment: Kubernetes Version 1.22 on EKS.

Flink Operator veresion 1.2.0

Flink Veresion 1.15 (errors in 1.14 too)
Reporter: yuvipanda


I'm trying to create a minimal running flink cluster with reactive scaling 
using the kubernetes operator (running v1.2.0), with the following YAML:

 

{{
kind: FlinkDeployment
metadata:
  name: test-flink-cluster
spec:
  flinkConfiguration:
    scheduler-mode: reactive
  flinkVersion: v1_15
  image: flink:1.15
  jobManager:
    replicas: 1
    resource:
      cpu: 0.2
      memory: 1024m
  mode: standalone
  serviceAccount: flink
  taskManager:
    replicas: 1
    resource:
      cpu: 0.2
      memory: 1024m}}

 

However, this causes the jobmanager to crash with the following:

 

{{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
system}}
{{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
system}}
{{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
file system}}
{{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
Read-only file system}}
{{Starting Job Manager}}
{{Starting standalonesession as a console application on host 
test-flink-cluster-58cd584fdd-xwbtf.}}
{{2022-10-25 18:32:00,422 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
}}
{{2022-10-25 18:32:00,510 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
Preconfiguration: }}
{{2022-10-25 18:32:00,512 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}


{{RESOURCE_PARAMS extraction logs:}}
{{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
{{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=469762048b -D 
jobmanager.memory.jvm-overhead.max=201326592b}}
{{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
will impact performance.}}
{{INFO  [] - Loading configuration property: blob.server.port, 6124}}
{{INFO  [] - Loading configuration property: kubernetes.jobmanager.annotations, 
flinkdeployment.flink.apache.org/generation:1}}
{{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 1}}
{{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
{{INFO  [] - Loading configuration property: 
"kubernetes.operator.metrics.reporter.prom.port", ""}}
{{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
test-flink-cluster.default}}
{{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
{{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
{{INFO  [] - Loading configuration property: kubernetes.service-account, flink}}
{{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
test-flink-cluster}}
{{INFO  [] - Loading configuration property: kubernetes.container.image, 
flink:1.15}}
{{INFO  [] - Loading configuration property: parallelism.default, 2}}
{{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
{{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2}}
{{INFO  [] - Loading configuration property: 
kubernetes.rest-service.exposed.type, ClusterIP}}
{{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
{{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
1024m}}
{{INFO  [] - Loading configuration property: 
"kubernetes.operator.metrics.reporter.prom.class", 
"org.apache.flink.metrics.prometheus.PrometheusReporter"}}
{{INFO  [] - Loading configuration property: web.cancel.enable, false}}
{{INFO  [] - Loading configuration property: execution.target, remote}}
{{INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
1024m}}
{{INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122}}
{{INFO  [] - Loading configuration property: kubernetes.internal.cluster-mode, 
SESSION}}
{{INFO  [] - Loading configuration property: kubernetes.jobmanager.cpu, 0.2}}
{{INFO  [] - Loading configuration property: $internal.flink.version, v1_15}}
{{INFO  [] - The derived from fraction jvm overhead memory (102.400mb 
(107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min 
value will be used instead}}
{{INFO  [] - Final Master Memory configuration:}}
{{INFO  [] -   Total Process Memory: 1024.000mb (1073741824 bytes)}}
{{INFO  [] -     Total Flink Memory: 

[jira] [Commented] (FLINK-29762) Can not create a standalone cluster with reactive mode using the operator

2022-10-25 Thread yuvipanda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623980#comment-17623980
 ] 

yuvipanda commented on FLINK-29762:
---

(Not sure if 'Major' is appropriate, but it was the default so I let it be)

> Can not create a standalone cluster with reactive mode using the operator
> -
>
> Key: FLINK-29762
> URL: https://issues.apache.org/jira/browse/FLINK-29762
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: Kubernetes Version 1.22 on EKS.
> Flink Operator veresion 1.2.0
> Flink Veresion 1.15 (errors in 1.14 too)
>Reporter: yuvipanda
>Priority: Major
>
> I'm trying to create a minimal running flink cluster with reactive scaling 
> using the kubernetes operator (running v1.2.0), with the following YAML:
>  
> {{
> kind: FlinkDeployment
> metadata:
>   name: test-flink-cluster
> spec:
>   flinkConfiguration:
>     scheduler-mode: reactive
>   flinkVersion: v1_15
>   image: flink:1.15
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m
>   mode: standalone
>   serviceAccount: flink
>   taskManager:
>     replicas: 1
>     resource:
>       cpu: 0.2
>       memory: 1024m}}
>  
> However, this causes the jobmanager to crash with the following:
>  
> {{sed: couldn't open temporary file /opt/flink/conf/sedLX7Jx8: Read-only file 
> system}}
> {{sed: couldn't open temporary file /opt/flink/conf/sed1vva8t: Read-only file 
> system}}
> {{/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only 
> file system}}
> {{/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: 
> Read-only file system}}
> {{Starting Job Manager}}
> {{Starting standalonesession as a console application on host 
> test-flink-cluster-58cd584fdd-xwbtf.}}
> {{2022-10-25 18:32:00,422 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
> }}
> {{2022-10-25 18:32:00,510 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  
> Preconfiguration: }}
> {{2022-10-25 18:32:00,512 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - }}
> {{RESOURCE_PARAMS extraction logs:}}
> {{jvm_params: -Xmx469762048 -Xms469762048 -XX:MaxMetaspaceSize=268435456}}
> {{dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=469762048b -D 
> jobmanager.memory.jvm-overhead.max=201326592b}}
> {{logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
> will impact performance.}}
> {{INFO  [] - Loading configuration property: blob.server.port, 6124}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:1}}
> {{INFO  [] - Loading configuration property: kubernetes.jobmanager.replicas, 
> 1}}
> {{INFO  [] - Loading configuration property: scheduler-mode, reactive}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.port", ""}}
> {{INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> test-flink-cluster.default}}
> {{INFO  [] - Loading configuration property: kubernetes.taskmanager.cpu, 0.2}}
> {{INFO  [] - Loading configuration property: "prometheus.io/port", ""}}
> {{INFO  [] - Loading configuration property: kubernetes.service-account, 
> flink}}
> {{INFO  [] - Loading configuration property: kubernetes.cluster-id, 
> test-flink-cluster}}
> {{INFO  [] - Loading configuration property: kubernetes.container.image, 
> flink:1.15}}
> {{INFO  [] - Loading configuration property: parallelism.default, 2}}
> {{INFO  [] - Loading configuration property: kubernetes.namespace, default}}
> {{INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 
> 2}}
> {{INFO  [] - Loading configuration property: 
> kubernetes.rest-service.exposed.type, ClusterIP}}
> {{INFO  [] - Loading configuration property: "prometheus.io/scrape", "true"}}
> {{INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: 
> "kubernetes.operator.metrics.reporter.prom.class", 
> "org.apache.flink.metrics.prometheus.PrometheusReporter"}}
> {{INFO  [] - Loading configuration property: web.cancel.enable, false}}
> {{INFO  [] - Loading configuration property: execution.target, remote}}
> {{INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
> 1024m}}
> {{INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122}}
> {{INFO  [] - Loading configuration property: 
> 

[jira] [Assigned] (FLINK-29761) Simplify HadoopModule

2022-10-25 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi reassigned FLINK-29761:
-

Assignee: Gabor Somogyi

> Simplify HadoopModule
> -
>
> Key: FLINK-29761
> URL: https://issues.apache.org/jira/browse/FLINK-29761
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Deployment / Kubernetes, Deployment 
> / YARN
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: security
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29761) Simplify HadoopModule

2022-10-25 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-29761:
--
Component/s: Connectors / Common
 Deployment / Kubernetes
 Deployment / YARN

> Simplify HadoopModule
> -
>
> Key: FLINK-29761
> URL: https://issues.apache.org/jira/browse/FLINK-29761
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Deployment / Kubernetes, Deployment 
> / YARN
>Reporter: Gabor Somogyi
>Priority: Major
>  Labels: security
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29761) Simplify HadoopModule

2022-10-25 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-29761:
--
Labels: security  (was: )

> Simplify HadoopModule
> -
>
> Key: FLINK-29761
> URL: https://issues.apache.org/jira/browse/FLINK-29761
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Gabor Somogyi
>Priority: Major
>  Labels: security
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29761) Simplify HadoopModule

2022-10-25 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-29761:
-

 Summary: Simplify HadoopModule
 Key: FLINK-29761
 URL: https://issues.apache.org/jira/browse/FLINK-29761
 Project: Flink
  Issue Type: Sub-task
Reporter: Gabor Somogyi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29536) Add WATCH_NAMESPACES env var to kubernetes operator

2022-10-25 Thread Tony Garrard (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623970#comment-17623970
 ] 

Tony Garrard commented on FLINK-29536:
--

[~gyfora] can you assign this to me thanks

> Add WATCH_NAMESPACES env var to kubernetes operator
> ---
>
> Key: FLINK-29536
> URL: https://issues.apache.org/jira/browse/FLINK-29536
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Tony Garrard
>Priority: Major
>
> Provide the ability to set the namespaces watched by the operator using an 
> env var. Whilst the additional config can still be used, the presence of the 
> env var will take priority.
>  
> Reasons for issue
>  # Operator will take effect of the setting immediately as pod will roll 
> (rather than waiting for the config to be refreshed)
>  # If the operator is to be olm bundled we will be able to set the target 
> namespace using the following 
> {{env:}}
>   {{  - name: WATCHED_NAMESPACE}}
>   {{valueFrom:}}
>   {{  fieldRef:}}
>  {{fieldPath: 
> metadata.annotations['olm.targetNamespaces']}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on pull request #21150: [hotfix] Replace deprecated DescribeTopicsResult#all, KafkaConsumer#poll and ContainerState#getContainerIpAddress

2022-10-25 Thread GitBox


snuyanzin commented on PR #21150:
URL: https://github.com/apache/flink/pull/21150#issuecomment-1290884251

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on a diff in pull request #20258: [FLINK-28522][tests][JUnit5 migration] flink-sequence-file

2022-10-25 Thread GitBox


RyanSkraba commented on code in PR #20258:
URL: https://github.com/apache/flink/pull/20258#discussion_r1004723756


##
flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java:
##
@@ -19,70 +19,54 @@
 package org.apache.flink.formats.sequencefile;
 
 import org.apache.hadoop.conf.Configuration;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Before;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.HamcrestCondition.matching;
-
 /** Tests for the {@link SerializableHadoopConfiguration}. */
-public class SerializableHadoopConfigurationTest {
+class SerializableHadoopConfigurationTest {
 
 private static final String TEST_KEY = "test-key";
 
 private static final String TEST_VALUE = "test-value";
 
 private Configuration configuration;
 
-@Before
-public void createConfigWithCustomProperty() {
+@BeforeEach
+void createConfigWithCustomProperty() {
 this.configuration = new Configuration();
 configuration.set(TEST_KEY, TEST_VALUE);
 }
 
 @Test
-public void customPropertiesSurviveSerializationDeserialization()
+void customPropertiesSurviveSerializationDeserialization()
 throws IOException, ClassNotFoundException {
 final SerializableHadoopConfiguration serializableConfigUnderTest =
 new SerializableHadoopConfiguration(configuration);
 final byte[] serializedConfigUnderTest = 
serializeAndGetBytes(serializableConfigUnderTest);
 final SerializableHadoopConfiguration deserializableConfigUnderTest =
 deserializeAndGetConfiguration(serializedConfigUnderTest);
 
-assertThat(deserializableConfigUnderTest.get())
-.satisfies(matching(hasTheSamePropertiesAs(configuration)));
-}
-
-// Matchers 
 //
-
-private static TypeSafeMatcher hasTheSamePropertiesAs(
-final Configuration expectedConfig) {
-return new TypeSafeMatcher() {
-@Override
-protected boolean matchesSafely(Configuration actualConfig) {
-final String value = actualConfig.get(TEST_KEY);
-return actualConfig != expectedConfig
-&& value != null
-&& expectedConfig.get(TEST_KEY).equals(value);
-}
-
-@Override
-public void describeTo(Description description) {
-description
-.appendText("a Hadoop Configuration with property: 
key=")
-.appendValue(TEST_KEY)
-.appendText(" and value=")
-.appendValue(TEST_VALUE);
-}
-};
+
Assertions.assertThat(deserializableConfigUnderTest.get())
+.matches(
+actualConfig -> {
+final String value = actualConfig.get(TEST_KEY);
+return actualConfig != 
serializableConfigUnderTest.get()
+&& value != null
+&& serializableConfigUnderTest
+.get()
+.get(TEST_KEY)
+.equals(value);
+})
+.describedAs(

Review Comment:
   Yes, it seems that the `describedAs` needs to come before any asserts!
   
   In this case, I changed the logic quite a bit from the original to improve 
the descriptive error messages.  The new logic would print the error message:
   
   ```
   [ERROR] Failures: 
   [ERROR]   
SerializableHadoopConfigurationTest.customPropertiesSurviveSerializationDeserialization:60
 
   [a Hadoop Configuration with property: key=test-key and value=test-value] (1 
failure)
   -- failure 1 --
   expected: "test-value"
but was: "faked-for-error-message"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] klion26 commented on a diff in pull request #21050: [FLINK-29095][state] Improve logging in SharedStateRegistry

2022-10-25 Thread GitBox


klion26 commented on code in PR #21050:
URL: https://github.com/apache/flink/pull/21050#discussion_r1004688078


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -95,60 +95,78 @@ public StreamStateHandle registerReference(
 entry = registeredStates.get(registrationKey);
 
 if (entry == null) {
-// Additional check that should never fail, because only state 
handles that are not
-// placeholders should
-// ever be inserted to the registry.
 checkState(
-!isPlaceholder(state),
+!isPlaceholder(newHandle),
 "Attempt to reference unknown state: " + 
registrationKey);
 
-entry = new SharedStateEntry(state, checkpointID);
+LOG.trace(
+"Registered new shared state {} under key {}.", 
newHandle, registrationKey);
+entry = new SharedStateEntry(newHandle, checkpointID);
 registeredStates.put(registrationKey, entry);
-LOG.trace("Registered new shared state {} under key {}.", 
entry, registrationKey);
 
-} else {
-// Delete if this is a real duplicate.
-// Note that task (backend) is not required to re-upload state
-// if the confirmation notification was missing.
-// However, it's also not required to use exactly the same 
handle or placeholder
-if (!Objects.equals(state, entry.stateHandle)) {
-if (entry.confirmed || isPlaceholder(state)) {
-scheduledStateDeletion = state;
-} else {
-// Old entry is not in a confirmed checkpoint yet, and 
the new one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 
1
-// 2. State Backend is initialized to UID xyz and a 
set of SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it 
under "xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", 
SST { 01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is 
used
-// So we use a new entry and discard the old one:
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = state;
-}
-LOG.trace(
-"Identified duplicate state registration under key 
{}. New state {} was determined to "
-+ "be an unnecessary copy of existing 
state {} and will be dropped.",
-registrationKey,
-state,
-entry.stateHandle);
-}
+// no further handling
+return entry.stateHandle;
+
+} else if (entry.stateHandle == newHandle) {
+// might be a bug but state backend is not required to use a 
place-holder
+LOG.debug(
+"Duplicated registration under key {} with the same 
object: {}",
+registrationKey,
+newHandle);
+} else if (Objects.equals(entry.stateHandle, newHandle)) {
+// might be a bug but state backend is not required to use a 
place-holder
+LOG.debug(
+"Duplicated registration under key {} with the new 
object: {}.",
+registrationKey,
+newHandle);
+} else if (isPlaceholder(newHandle)) {
 LOG.trace(
-"Updating last checkpoint for {} from {} to {}",
+"Duplicated registration under key {} with a 
placeholder (normal case)",
+registrationKey);
+scheduledStateDeletion = newHandle;
+} else if (entry.confirmed) {
+LOG.info(
+"Duplicated registration under key {} of a new state: 
{}. "
++ "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
++ "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
 registrationKey,
-

[GitHub] [flink] flinkbot commented on pull request #21151: [FLINK-29611][tests] Fix flaky test in CoBroadcastWithNonKeyedOperatorTest and DataStreamBatchExecutionITCase

2022-10-25 Thread GitBox


flinkbot commented on PR #21151:
URL: https://github.com/apache/flink/pull/21151#issuecomment-1290812024

   
   ## CI report:
   
   * cb89bacacdb81d265dd3260e4c62f4e58be9024b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29744) Throw DeploymentFailedException on ImagePullBackOff

2022-10-25 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29744.
--
Fix Version/s: kubernetes-operator-1.3.0
   Resolution: Fixed

merged to main ddd7c927481b52fc3422ef6534760d8d436428e7

> Throw DeploymentFailedException on ImagePullBackOff
> ---
>
> Key: FLINK-29744
> URL: https://issues.apache.org/jira/browse/FLINK-29744
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #411: [FLINK-29744] Throw DeploymentFailedException on ImagePullBackOff

2022-10-25 Thread GitBox


gyfora merged PR #411:
URL: https://github.com/apache/flink-kubernetes-operator/pull/411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-10-25 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623913#comment-17623913
 ] 

Fabian Paul commented on FLINK-26394:
-

I am reopening the issue because it only has been fixed on 1.16 and not on 1.15

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-10-25 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul reopened FLINK-26394:
-

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-10-25 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul updated FLINK-26394:

Affects Version/s: 1.15.2

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Gen Luo
>Assignee: Gen Luo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29611) Fix flaky tests in CoBroadcastWithNonKeyedOperatorTest

2022-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29611:
---
Labels: pull-request-available  (was: )

> Fix flaky tests in CoBroadcastWithNonKeyedOperatorTest
> --
>
> Key: FLINK-29611
> URL: https://issues.apache.org/jira/browse/FLINK-29611
> Project: Flink
>  Issue Type: Bug
>Reporter: Sopan Phaltankar
>Priority: Minor
>  Labels: pull-request-available
>
> The test 
> _org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperatorTest.testMultiStateSupport_
>  has the following failure:
> Failures:
> [ERROR]   CoBroadcastWithNonKeyedOperatorTest.testMultiStateSupport:74 
> Wrong Side Output: arrays first differed at element [0]; expected: 15 : 9:key.6->6> but was:5>
> I used the tool [NonDex|https://github.com/TestingResearchIllinois/NonDex] to 
> find this flaky test. 
> Command: mvn edu.illinois:nondex-maven-plugun:1.1.2:nondex -Dtest='Fully 
> Qualified Test Name'
> I analyzed the assertion failure and found that the root cause is because the 
> test method calls ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries() 
> which calls the entrySet() method of the underlying HashMap. entrySet() 
> returns the entries in a non-deterministic way, causing the test to be flaky. 
> The fix would be to change _HashMap_ to _LinkedHashMap_ where the Map is 
> getting initialized.
> On further analysis, it was found that the Map is getting initialized on line 
> 53 of org.apache.flink.runtime.state.HeapBroadcastState class.
> After changing from HashMap to LinkedHashMap, the above test is passing.
> Edit: Upon making this change and running the CI, it was found that the tests 
> org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.batchKeyedBroadcastExecution
>  and 
> org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.batchBroadcastExecution
>  were failing. Upon further investigation, I found that these tests were also 
> flaky and depended on the earlier made change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] sopan98 opened a new pull request, #21151: FLINK-29611][tests] Fix flaky test in CoBroadcastWithNonKeyedOperatorTest and DataStreamBatchExecutionITCase

2022-10-25 Thread GitBox


sopan98 opened a new pull request, #21151:
URL: https://github.com/apache/flink/pull/21151

   This PR aims to solve the issue presented here: 
https://issues.apache.org/jira/browse/FLINK-29611
   
   ## What is the purpose of the change
   The fix is to change the HashMap in HeapBroadcastState to LinkedHashMap to 
make the tests more stable (less flaky).
   
   ## Brief change log
   - Changing the map implementation of HeadBroadcastState
   - Modifying the tests that depend on this class (Only changing the order of 
insertion and not the elements or anything else)
   
   ## Verifying this change
   Existing tests already cover this change, and it can pass them successfully.
   The tests are: 
   1. 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperatorTest.testMultiStateSupport
   2. 
org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.batchBroadcastExecution
   3. 
org.apache.flink.api.datastream.DataStreamBatchExecutionITCase.batchKeyedBroadcastExecution
   
   ## Does this pull request potentially affect one of the following parts:
   * Dependencies (does it add or upgrade a dependency): No
   * The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
   * The serializers: No
   * The runtime per-record code paths (performance sensitive): No
   * Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos,, ZooKeeper: No
   * The S3 file system connector: No
   
   ## Documentation
   Does this pull request introduce a new feature? No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] klion26 commented on a diff in pull request #21053: [FLINK-29157][doc] Clarify the contract between CompletedCheckpointStore and SharedStateRegistry

2022-10-25 Thread GitBox


klion26 commented on code in PR #21053:
URL: https://github.com/apache/flink/pull/21053#discussion_r1004561770


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java:
##
@@ -39,6 +40,16 @@ public interface CompletedCheckpointStore {
  * Only a bounded number of checkpoints is kept. When exceeding the 
maximum number of
  * retained checkpoints, the oldest one will be discarded.
  *
+ * After https://issues.apache.org/jira/browse/FLINK-24611;>FLINK-24611, {@link
+ * SharedStateRegistry#unregisterUnusedState} should be called here to 
subsume unused state.
+ * Note, the {@link 
CompletedCheckpoint} passed to
+ * {@link SharedStateRegistry#registerAllAfterRestored} or {@link
+ * SharedStateRegistryFactory#create} must be the same object as the input 
parameter, otherwise
+ * the state may be deleted by mistake.
+ *
+ * After https://issues.apache.org/jira/browse/FLINK-24611;>FLINK-25872, {@link

Review Comment:
   https://issues.apache.org/jira/browse/FLINK-24611 -> 
https://issues.apache.org/jira/browse/FLINK-25872



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29755) PulsarSourceUnorderedE2ECase.testSavepoint failed because of missing TaskManagers

2022-10-25 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623885#comment-17623885
 ] 

Yufan Sheng commented on FLINK-29755:
-

[~mapohl] I'll try to figure out the cause tomorrow.

> PulsarSourceUnorderedE2ECase.testSavepoint failed because of missing 
> TaskManagers
> -
>
> Key: FLINK-29755
> URL: https://issues.apache.org/jira/browse/FLINK-29755
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
> Attachments: PulsarSourceUnorderedE2ECase.testSavepoint.log
>
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42325=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=13932]
>  failed (not exclusively) due to a problem with 
> {{PulsarSourceUnorderedE2ECase.testSavepoint}}. It seems like there were no 
> TaskManagers spun up which resulted in the test job failing with a 
> {{NoResourceAvailableException}}.
> {code}
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - 
> Could not acquire the minimum required resources, failing slot requests. 
> Acquired: []. Current slot pool status: Registered TMs: 0, registered slots: 
> 0 free slots: 0
> {code}
> I didn't raise this one to critical because it looks like a missing 
> TaskManager test environment issue. I attached the e2e test-specific logs to 
> the Jira issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29754) HadoopConfigLoader should consider Hadoop configuration files

2022-10-25 Thread Peter Vary (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623882#comment-17623882
 ] 

Peter Vary commented on FLINK-29754:


[This|https://github.com/apache/flink/blob/0e612856772d5f469c7d4a4fff90a58b6e0f5578/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java#L59]
 is the line which causes the issue.

When you try to instantiate the {{HdfsConfiguration}} you need the 
{{hadoop-hdfs}} on the classpath.

> HadoopConfigLoader should consider Hadoop configuration files
> -
>
> Key: FLINK-29754
> URL: https://issues.apache.org/jira/browse/FLINK-29754
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Peter Vary
>Priority: Major
>
> Currently 
> [HadoopConfigLoader|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java]
>  considers Hadoop configurations on the classpath, but does not consider 
> Hadoop configuration files which are set in another way.
> So if the Hadoop configuration is set through the {{HADOOP_CONF_DIR}} 
> environment variable, then the configuration loaded by the HadoopConfigLoader 
> will not contain the values set there.
> This can cause unexpected behaviour when setting checkpoint / savepoint dirs 
> on S3, and the specific S3 configurations are set in the Hadoop configuration 
> files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29753) FileSource throws exception reading file with name that ends with xz

2022-10-25 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-29753.
--
Resolution: Invalid

This works fine for me. The error {{Caused by: java.lang.NoClassDefFoundError: 
org/tukaani/xz/XZInputStream}} makes it more plausible that you're not 
packaging your JAR correctly or, if you run this from IntelliJ, you're not 
marking to add the dependencies with 'Provided' scope to the classpath. 

> FileSource throws exception reading file with name that ends with xz 
> -
>
> Key: FLINK-29753
> URL: https://issues.apache.org/jira/browse/FLINK-29753
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.2
>Reporter: Xuannan Su
>Priority: Major
>
> FileSource throws the following exception reading file with a name that ends 
> with xz
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError: org/tukaani/xz/XZInputStream
>     at 
> org.apache.flink.api.common.io.compression.XZInputStreamFactory.create(XZInputStreamFactory.java:42)
>     at 
> org.apache.flink.api.common.io.compression.XZInputStreamFactory.create(XZInputStreamFactory.java:31)
>     at 
> org.apache.flink.connector.file.src.impl.StreamFormatAdapter.lambda$openStream$3(StreamFormatAdapter.java:178)
>     at 
> org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException(Utils.java:45)
>     at 
> org.apache.flink.connector.file.src.impl.StreamFormatAdapter.openStream(StreamFormatAdapter.java:172)
>     at 
> org.apache.flink.connector.file.src.impl.StreamFormatAdapter.createReader(StreamFormatAdapter.java:70)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.lang.ClassNotFoundException: org.tukaani.xz.XZInputStream
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>     ... 16 more {code}
> The code to reproduce the error:
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final FileSource source =
> FileSource.forRecordStreamFormat(new TextLineInputFormat(), new 
> 

  1   2   3   >