[jira] [Commented] (FLINK-25420) Port JDBC Source to new Source API (FLIP-27)

2022-06-06 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550811#comment-17550811
 ] 

Jing Ge commented on FLINK-25420:
-

Thanks [~RocMarshal] for driving this! I think it makes sense to write a FLIP 
based on the template: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP+Connector+Template] and 
discuss it in the ML.

> Port JDBC Source to new Source API (FLIP-27)
> 
>
> Key: FLINK-25420
> URL: https://issues.apache.org/jira/browse/FLINK-25420
> Project: Flink
>  Issue Type: Improvement
>Reporter: Martijn Visser
>Priority: Major
>
> The current JDBC connector is using the old SourceFunction interface, which 
> is going to be deprecated. We should port/refactor the JDBC Source to use the 
> new Source API, based on FLIP-27 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27928) External Resource Framework: 'external-resources' config delimiter is not working as expected

2022-06-06 Thread James Cho (Jira)
James Cho created FLINK-27928:
-

 Summary: External Resource Framework: 'external-resources' config 
delimiter is not working as expected
 Key: FLINK-27928
 URL: https://issues.apache.org/jira/browse/FLINK-27928
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.15.0
Reporter: James Cho


For flink version 1.15.0, the configuration `external-resources` delimiter 
separation(";") functionality between external resource names is not working as 
expected. 

 

Reproducing the bug setting up a flink cluster running using docker with two 
external resources named `abc` and `efg`:

docker-compose.yml:

 
{code:java}
services:
  jobmanager:
image: flink:1.15.0-scala_2.12
ports:
  - "8081:8081"
  - "6123:6123"
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager

  taskmanager:
image: flink:1.15.0-scala_2.12
depends_on:
  - jobmanager
command: taskmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 8
external-resources: abc;efg
external-resource.abc.amount: 1
external-resource.efg.amount: 1{code}
 

Related task manager logs:

 
{code:java}
root@2873c5ae72c2:/opt/flink# cat log/flink* | grep ExternalResourceUtils
2022-06-07 05:42:34,232 WARN  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - The amount 
of the abc;efg should be configured. Will ignore that resource.
2022-06-07 05:42:34,232 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2022-06-07 05:42:34,236 WARN  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Could not 
find driver class name for abc;efg. Please make sure 
external-resource.abc;efg.driver-factory.class is configured.
2022-06-07 05:42:34,240 WARN  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - The amount 
of the abc;efg should be configured. Will ignore that resource.
{code}
Notice flink recognizes `abc;efg` as a single external resource rather than as 
`abc` and `efg`

 

 

The same issue does not exist in flink version 1.14.4

docker-compose.yml:
{code:java}
services:
  jobmanager:
image: flink:1.14.4-scala_2.12
ports:
  - "8081:8081"
  - "6123:6123"
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager

  taskmanager:
image: flink:1.14.4-scala_2.12
depends_on:
  - jobmanager
command: taskmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 8
external-resources: abc;efg
external-resource.abc.amount: 1
external-resource.efg.amount: 1 {code}
related tm logs
{code:java}
docker logs ... | grep ExternalResourceUtils
2022-06-07 05:48:13,531 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: [abc, efg]
2022-06-07 05:48:13,534 WARN  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Could not 
find driver class name for abc. Please make sure 
external-resource.abc.driver-factory.class is configured.
2022-06-07 05:48:13,534 WARN  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Could not 
find driver class name for efg. Please make sure 
external-resource.efg.driver-factory.class is configured.
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890784808


##
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java:
##
@@ -254,58 +272,150 @@ public Tuple3 
map(Tuple2 value
 DenseVector, DenseVector[], Tuple2>,
 IterationListener> {
 private final DistanceMeasure distanceMeasure;
-private ListState points;
-private ListState centroids;
+private ListState centroidsState;
+private DenseVector[] centroids;
+
+private Path basePath;
+private OperatorID operatorID;
+private MemorySegmentPool segmentPool;
+private DataCacheWriter dataCacheWriter;
 
 public SelectNearestCentroidOperator(DistanceMeasure distanceMeasure) {
+super();
 this.distanceMeasure = distanceMeasure;
 }
 
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output>> output) {
+super.setup(containingTask, config, output);
+
+operatorID = config.getOperatorID();
+
+MemoryManager memoryManager = 
getContainingTask().getEnvironment().getMemoryManager();

Review Comment:
   Could you refactor the code so that algorithm developers won't need to 
handle so many details?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27927) Adjust table store connector common apis

2022-06-06 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-27927:


 Summary: Adjust table store connector common apis
 Key: FLINK-27927
 URL: https://issues.apache.org/jira/browse/FLINK-27927
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.2.0


We currently have the initial FileStoreTable-related interface, but something 
is missing to satisfy our four approaches:
1. Type conversion
2. Data structure conversion
3. Filter conversion
4. Scan and Read

In this jira, more easy-to-use features will be added.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27927) Improve table store connector common apis

2022-06-06 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-27927:
-
Summary: Improve table store connector common apis  (was: Adjust table 
store connector common apis)

> Improve table store connector common apis
> -
>
> Key: FLINK-27927
> URL: https://issues.apache.org/jira/browse/FLINK-27927
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: table-store-0.2.0
>
>
> We currently have the initial FileStoreTable-related interface, but something 
> is missing to satisfy our four approaches:
> 1. Type conversion
> 2. Data structure conversion
> 3. Filter conversion
> 4. Scan and Read
> In this jira, more easy-to-use features will be added.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27927) Improve table store connector common interfaces

2022-06-06 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-27927:
-
Summary: Improve table store connector common interfaces  (was: Improve 
table store connector common apis)

> Improve table store connector common interfaces
> ---
>
> Key: FLINK-27927
> URL: https://issues.apache.org/jira/browse/FLINK-27927
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: table-store-0.2.0
>
>
> We currently have the initial FileStoreTable-related interface, but something 
> is missing to satisfy our four approaches:
> 1. Type conversion
> 2. Data structure conversion
> 3. Filter conversion
> 4. Scan and Read
> In this jira, more easy-to-use features will be added.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] Sxnan commented on pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-06-06 Thread GitBox


Sxnan commented on PR #19653:
URL: https://github.com/apache/flink/pull/19653#issuecomment-1148202674

   @zhuzhurk Thanks for your comments. Please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Sxnan commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-06-06 Thread GitBox


Sxnan commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r890770525


##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
+import org.apache.flink.types.IntValue;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for reusing persisted intermediate dataset */
+public class JobIntermediateDatasetReuseTest {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);
+
+@Test
+public void testClusterPartitionReuse() throws Exception {
+internalTestClusterPartitionReuse(1);
+}
+
+@Test
+public void testClusterPartitionReuseMultipleParallelism() throws 
Exception {
+internalTestClusterPartitionReuse(64);
+}
+
+private void internalTestClusterPartitionReuse(int parallelism) throws 
Exception {
+final TestingMiniClusterConfiguration miniClusterConfiguration =
+TestingMiniClusterConfiguration.newBuilder().build();
+
+try (TestingMiniCluster miniCluster =
+
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+miniCluster.start();
+
+IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
+final JobGraph firstJobGraph = createFirstJobGraph(parallelism, 
intermediateDataSetID);
+miniCluster.submitJob(firstJobGraph).get();
+CompletableFuture jobResultFuture =
+miniCluster.requestJobResult(firstJobGraph.getJobID());
+JobResult jobResult = jobResultFuture.get();
+assertTrue(jobResult.isSuccess());
+
+final JobGraph secondJobGraph =
+createSecondJobGraph(parallelism, intermediateDataSetID);
+miniCluster.submitJob(secondJobGraph).get();
+jobResultFuture = 
miniCluster.requestJobResult(secondJobGraph.getJobID());
+jobResult = jobResultFuture.get();
+assertTrue(jobResult.isSuccess());
+}
+}
+
+@Test
+public void 
testClusterPartitionReuseWithMoreConsumerParallelismThrowException()
+throws Exception {
+final TestingMiniClusterConfiguration miniClusterConfiguration =
+TestingMiniClusterConfiguration.newBuilder().build();
+
+try (TestingMiniCluster miniCluster =
+
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+miniCluster.start();
+
+IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
+final JobGraph firstJobGraph = createFirstJobGraph(1, 
intermediateDataSetID);
+miniCluster.submitJob(firstJobGraph).get();

[GitHub] [flink] Sxnan commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-06-06 Thread GitBox


Sxnan commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r890770323


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java:
##
@@ -120,6 +129,31 @@ public Collection 
getAllTrackedPartitions()
 return 
partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList());
 }
 
+@Override
+public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+this.resourceManagerGateway = resourceManagerGateway;
+}
+
+@Override
+public List getClusterPartitionShuffleDescriptors(
+IntermediateDataSetID intermediateDataSetID) {
+return clusterPartitionShuffleDescriptors.computeIfAbsent(
+intermediateDataSetID, 
this::requestShuffleDescriptorsFromResourceManager);
+}
+
+private List 
requestShuffleDescriptorsFromResourceManager(
+IntermediateDataSetID intermediateDataSetID) {
+Preconditions.checkNotNull(
+resourceManagerGateway, "JobMaster is not connected to 
ResourceManager");
+try {
+return this.resourceManagerGateway
+
.getClusterPartitionsShuffleDescriptors(intermediateDataSetID)
+.get();
+} catch (InterruptedException | ExecutionException e) {
+throw new RuntimeException(e);

Review Comment:
   The message is added. And it catches all throwable. 
[62bb33d](https://github.com/apache/flink/pull/19653/commits/62bb33d10fb8ac51450ca96e336c3c312259e12a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Sxnan commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-06-06 Thread GitBox


Sxnan commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r890770010


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java:
##
@@ -115,6 +122,29 @@ private FailureHandlingResult handleFailure(
 final Set verticesToRestart,
 final boolean globalFailure) {
 
+if (!globalFailure) {

Review Comment:
   You are right. It is much clear to do the translation in the 
DefaultScheduler. Change is made in 
[976d73b](https://github.com/apache/flink/pull/19653/commits/976d73b7f33d1d5ee8b788906b083419d6342748).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27497) Track terminal job states in the observer

2022-06-06 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550799#comment-17550799
 ] 

Aitozi commented on FLINK-27497:


continue this work now

> Track terminal job states in the observer
> -
>
> Key: FLINK-27497
> URL: https://issues.apache.org/jira/browse/FLINK-27497
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Aitozi
>Priority: Critical
>
> With the improvements in FLINK-27468 Flink 1.15 app clusters will not be shut 
> down in case of terminal job states (failed, finished) etc.
> It is important to properly handle these states and let the user know about 
> it.
> We should always trigger events, and for terminally failed jobs record the 
> error information in the FlinkDeployment status.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25420) Port JDBC Source to new Source API (FLIP-27)

2022-06-06 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550796#comment-17550796
 ] 

RocMarshal commented on FLINK-25420:


[~martijnvisser] Sorry for the late response. 
I have made the study about the JDBC connector source for the new connector 
API. It is obvious that the new source interface and the old source interface 
are no longer applicable to the distribution of source split. However, 
interfaces such as LookupSource and Dialects in Table Source can still be 
reused and improved based on the old.

Let's start with API source first.
h1. JdbcSourceSplit

The preliminary definition of source split is as follows:

 
{code:java}
public class JdbcSourceSplit implements SourceSplit, Serializable {
 
private final String id;
 
private final String sqlTemplate;
 
private final @Nullable Serializable[] parameters;
 
// The default value is 0. The valid value range between [1, 
Integer.MAX_VALUE]
private final int offset;
 
// code placeholder...
 
} {code}
 
h1. Bounded or UnBounded for JdbcSourceEnumerator
h2. UnBounded case:
 * Continuous JdbcSourceSplit
 ** If there is no JdbcSouceSplit at present, the JdbcSourceSplitEnumerator 
will not process the JdbcSouceSplit requestments.

h2. Bounded case:
 * Static JdbcSourceSplit Set.
 ** If there is no JdbcSouceSplit at present, the JdbcSourceSplitEnumerator 
will notify no-more JdbcSouceSplit for JdbcSouceSplit requestments.
 * The items to be discussed:
 ** Whether we need to support such a special bounded scenario abstraction?
 *** The number of JdbcSourceSplit is certain, but the time to generate all 
JdbcSourceSplit completely is not certain in the user defined implementation. 
When the condition that the JdbcSourceSplit generate-process end is met, the 
JdbcSourceSplit will not be generated. After all JdbcSourceSplit processing is 
completed, the reader will be notified that there are no more JdbcSourceSplit 
from JdbcSourceSplitEnumerator.

h1. Semantic guarantee for JdbcSourceSplitReader
h2. Stream execution mode
 * JdbcSourceSplit-based for 'At least once'
 ** If any exception is encountered, just reprocess the JdbcSourceSplit for the 
default offset value 0.

 * ResultSet offset based for 'Exactly Once':
 ** If any exception is encountered, just reprocess the JdbcSourceSplit based 
on the offset value from JdbcSourceSplitState.

 * 
 ** 
 *** If the offset value is greater than the number of result set messages,  
skip the current JdbcSourceSplit.
 *** If offset is less than or equal to the number of result set messages, 
continue processing based on the  offset position.
 ** *Disadvantages:* It only makes sense for Exactly Once that the ResultSet 
corresponding to this SQL(JdbcSourceSplit) remains unchanged in the whole 
lifecycle of JdbcSourceSplit processing. {_}Unfortunately{_}, this condition is 
not met in most databases and data scenarios.

 * JdbcSourceSplit-based for 'At most once'
 ** If the offset value of the current JdbcSourceSplit is not 0, it indicates 
that this is a processed JdbcSourceSplit for now,just skip this 
JdbcSourceSplit. In short, once we process the current JdbcSourceSplit with 
failure, it will be ignored and the processing of the next JdbcSourceSplit will 
begin.

h2. batch execution mode

If any exception is encountered, just reprocess the JdbcSourceSplit for the 
default offset value 0.

Please let me know what's your opinion. Thank you~.

> Port JDBC Source to new Source API (FLIP-27)
> 
>
> Key: FLINK-25420
> URL: https://issues.apache.org/jira/browse/FLINK-25420
> Project: Flink
>  Issue Type: Improvement
>Reporter: Martijn Visser
>Priority: Major
>
> The current JDBC connector is using the old SourceFunction interface, which 
> is going to be deprecated. We should port/refactor the JDBC Source to use the 
> new Source API, based on FLIP-27 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890757304


##
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java:
##
@@ -254,58 +272,150 @@ public Tuple3 
map(Tuple2 value
 DenseVector, DenseVector[], Tuple2>,
 IterationListener> {
 private final DistanceMeasure distanceMeasure;
-private ListState points;
-private ListState centroids;
+private ListState centroidsState;
+private DenseVector[] centroids;

Review Comment:
   Does this improve performance by using `centroids`? If not, it seems simpler 
not to add this variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890739744


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##
@@ -59,87 +80,100 @@ public DataCacheWriter(
 SupplierWithException pathGenerator,
 List priorFinishedSegments)
 throws IOException {
-this.serializer = serializer;
+this(serializer, fileSystem, pathGenerator, null, 
priorFinishedSegments);
+}
+
+public DataCacheWriter(
+TypeSerializer serializer,
+FileSystem fileSystem,
+SupplierWithException pathGenerator,
+@Nullable MemorySegmentPool segmentPool,
+List priorFinishedSegments)
+throws IOException {
 this.fileSystem = fileSystem;
 this.pathGenerator = pathGenerator;
-
-this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-this.currentSegment = new SegmentWriter(pathGenerator.get());
+this.segmentPool = segmentPool;
+this.serializer = serializer;
+this.finishedSegments = new ArrayList<>(priorFinishedSegments);
+this.currentSegmentWriter = createSegmentWriter();
 }
 
 public void addRecord(T record) throws IOException {
-currentSegment.addRecord(record);
-}
-
-public void finishCurrentSegment() throws IOException {
-finishCurrentSegment(true);
+if (!currentSegmentWriter.addRecord(record)) {
+currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+currentSegmentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+Preconditions.checkState(currentSegmentWriter.addRecord(record));
+}
 }
 
+/** Finishes adding records and closes resources occupied for adding 
records. */
 public List finish() throws IOException {
-finishCurrentSegment(false);
-return finishSegments;
-}
+if (currentSegmentWriter == null) {
+return finishedSegments;
+}
 
-public FileSystem getFileSystem() {
-return fileSystem;
+currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+currentSegmentWriter = null;
+return finishedSegments;
 }
 
-public List getFinishSegments() {
-return finishSegments;
+/**
+ * Flushes all added records to segments and returns a list of segments 
containing all cached
+ * records.
+ */
+public List getSegments() throws IOException {
+finishCurrentSegmentIfExists();
+return finishedSegments;
 }
 
-private void finishCurrentSegment(boolean newSegment) throws IOException {
-if (currentSegment != null) {
-currentSegment.finish().ifPresent(finishSegments::add);
-currentSegment = null;
+private void finishCurrentSegmentIfExists() throws IOException {
+if (currentSegmentWriter == null) {
+return;
 }
 
-if (newSegment) {
-currentSegment = new SegmentWriter(pathGenerator.get());
-}
+currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+currentSegmentWriter = createSegmentWriter();
 }
 
-private class SegmentWriter {
-
-private final Path path;
-
-private final FSDataOutputStream outputStream;
-
-private final DataOutputView outputView;
-
-private int currentSegmentCount;
-
-public SegmentWriter(Path path) throws IOException {
-this.path = path;
-this.outputStream = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-this.outputView = new DataOutputViewStreamWrapper(outputStream);
+/** Cleans up all previously added records. */
+public void cleanup() throws IOException {

Review Comment:
   Would it be more consistent with `State::clear()` to rename this method as 
`clear()`?



##
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java:
##
@@ -254,58 +272,150 @@ public Tuple3 
map(Tuple2 value
 DenseVector, DenseVector[], Tuple2>,
 IterationListener> {
 private final DistanceMeasure distanceMeasure;
-private ListState points;
-private ListState centroids;
+private ListState centroidsState;
+private DenseVector[] centroids;

Review Comment:
   Does this improve performance by using `centroids`? If not, it seems simpler 
to not adding this variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27926) Translate ```upgrading.md``` into chinese

2022-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27926:
---
Labels: pull-request-available  (was: )

> Translate ```upgrading.md``` into chinese
> -
>
> Key: FLINK-27926
> URL: https://issues.apache.org/jira/browse/FLINK-27926
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation
>Reporter: Zixuan Rao
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> A pull request is submitted as this issue is created. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] flinkbot commented on pull request #19890: Translate `upgrading.md` into chinese FLINK-27926

2022-06-06 Thread GitBox


flinkbot commented on PR #19890:
URL: https://github.com/apache/flink/pull/19890#issuecomment-1148175441

   
   ## CI report:
   
   * 59948edef8632d168533185876bc44815aea4a3f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27926) Translate ```upgrading.md``` into chinese

2022-06-06 Thread Zixuan Rao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550790#comment-17550790
 ] 

Zixuan Rao commented on FLINK-27926:


A pull request #19890 is submitted. [~dianfu] Could you help to merge it? 
Thanks 

> Translate ```upgrading.md``` into chinese
> -
>
> Key: FLINK-27926
> URL: https://issues.apache.org/jira/browse/FLINK-27926
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation
>Reporter: Zixuan Rao
>Priority: Major
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> A pull request is submitted as this issue is created. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] billyrrr opened a new pull request, #19890: Translate `upgrading.md` into chinese #FLINK-17285

2022-06-06 Thread GitBox


billyrrr opened a new pull request, #19890:
URL: https://github.com/apache/flink/pull/19890

   ## What is the purpose of the change
   
   Translate `upgrading.md` into chinese #FLINK-17285. The documentation is 
about upgrading existing flink cluster. 
   
   
   ## Brief change log
   
 - *Duplicate upgrading.md from current master to content.zh*
 - *Translate the documentation with automatic translation*
 - *Manually inspect translation and calibrate with translation table*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Rebuild doc and visually inspect the new page
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19889: [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.

2022-06-06 Thread GitBox


flinkbot commented on PR #19889:
URL: https://github.com/apache/flink/pull/19889#issuecomment-1148172903

   
   ## CI report:
   
   * a92e364e9bc5981dbe493762de04fb3a57b4dfb4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] PatrickRen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

2022-06-06 Thread GitBox


PatrickRen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1148172752

   I think this PR is ready to merge once we resolve minor pending comments 
above. @mas-chen would you like to make some updates and rebase the latest 
master?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27926) Translate ```upgrading.md``` into chinese

2022-06-06 Thread Zixuan Rao (Jira)
Zixuan Rao created FLINK-27926:
--

 Summary: Translate ```upgrading.md``` into chinese
 Key: FLINK-27926
 URL: https://issues.apache.org/jira/browse/FLINK-27926
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation
Reporter: Zixuan Rao


A pull request is submitted as this issue is created. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27872) Allow KafkaBuilder to set arbitrary subscribers

2022-06-06 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren closed FLINK-27872.
-
Resolution: Duplicate

> Allow KafkaBuilder to set arbitrary subscribers
> ---
>
> Key: FLINK-27872
> URL: https://issues.apache.org/jira/browse/FLINK-27872
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Salva
>Priority: Major
>
> Currently, 
> [KafkaSourceBuilder|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java]
>  has two setters:
>  * 
> [setTopics|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L157]
>  * 
> [setTopicPattern|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L168]
> which under the hood instantiate the corresponding (concrete) subscribers. 
> This covers the most common needs, I agree, but it might fall short in some 
> cases. Why not add a more generic setter:
>  * {{setSubscriber (???)}}
> Otherwise, how can users read from kafka in combination with custom 
> subscribing logic? Without looking much into it, it seems that they basically 
> cannot, at least without having to replicate some parts of the connector, 
> which seems rather inconvenient.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch

2022-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27910:
---
Labels: pull-request-available  (was: )

> FileSink not registered the timer to enforce rolling policy if started from 
> scratch
> ---
>
> Key: FLINK-27910
> URL: https://issues.apache.org/jira/browse/FLINK-27910
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: pull-request-available
>
> The current FileWriter only register the timer in initializeState, which is 
> now only called on restoring. Thus if the job is started from scratch, the 
> timer would fail to be registered and cause the rolling policy not work. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27872) Allow KafkaBuilder to set arbitrary subscribers

2022-06-06 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550788#comment-17550788
 ] 

Qingsheng Ren commented on FLINK-27872:
---

I think this duplicates FLINK-24660.

> Allow KafkaBuilder to set arbitrary subscribers
> ---
>
> Key: FLINK-27872
> URL: https://issues.apache.org/jira/browse/FLINK-27872
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: Salva
>Priority: Major
>
> Currently, 
> [KafkaSourceBuilder|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java]
>  has two setters:
>  * 
> [setTopics|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L157]
>  * 
> [setTopicPattern|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L168]
> which under the hood instantiate the corresponding (concrete) subscribers. 
> This covers the most common needs, I agree, but it might fall short in some 
> cases. Why not add a more generic setter:
>  * {{setSubscriber (???)}}
> Otherwise, how can users read from kafka in combination with custom 
> subscribing logic? Without looking much into it, it seems that they basically 
> cannot, at least without having to replicate some parts of the connector, 
> which seems rather inconvenient.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] pltbkd opened a new pull request, #19889: [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.

2022-06-06 Thread GitBox


pltbkd opened a new pull request, #19889:
URL: https://github.com/apache/flink/pull/19889

   
   
   ## What is the purpose of the change
   
   This pull request fixes the bug that FileSink does not register the timer to 
enforce rolling policy if starts from scratch
   
   
   ## Brief change log
   
 - FileWriter#initializeState is called with an empty state collection when 
creating writer for FileSink when starts from scratch.
   
   
   ## Verifying this change
   
   This change added a new unit test verifying this change.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #19482: [FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables

2022-06-06 Thread GitBox


lsyldliu commented on code in PR #19482:
URL: https://github.com/apache/flink/pull/19482#discussion_r890744876


##
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##
@@ -170,6 +170,13 @@ following parameters in `TableConfig` (note that these 
parameters affect all sou
 
 Multi-thread is used to split hive's partitions. You can use 
`table.exec.hive.load-partition-splits.thread-num` to configure the thread 
number. The default value is 3 and the configured value should be bigger than 0.
 
+###
+
+In some case, you may create an external table referring another table, but 
the partition columns is a subset of the referred table.
+Then when read the external table, there will be sub-directories in the 
partition directory of the external table.
+You can use `table.exec.hive.read-partition-with-subdirectory.enabled` to 
configure Flink to read the sub-directories or skip them directly.

Review Comment:
   You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` 
to allow Flink read ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] PatrickRen commented on a diff in pull request #19827: [FLINK-27806][table] Support binary & varbinary types in datagen connector

2022-06-06 Thread GitBox


PatrickRen commented on code in PR #19827:
URL: https://github.com/apache/flink/pull/19827#discussion_r890743854


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -176,6 +195,7 @@ void testSource() throws Exception {
 assertThat(row.getLong(1)).isBetween(10L, 100L);
 assertThat(row.getLong(2)).isEqualTo(i + 50);
 assertThat(row.getTimestamp(3, 
3).getMillisecond()).isBetween(begin - 5000, end);
+assertThat(row.getBinary(4).length).isEqualTo(2);

Review Comment:
   Could we also have an assertion here for field `f5`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] deadwind4 commented on a diff in pull request #19732: [FLINK-18887][python][connector/elasticsearch] Add Elasticsearch DataStream API

2022-06-06 Thread GitBox


deadwind4 commented on code in PR #19732:
URL: https://github.com/apache/flink/pull/19732#discussion_r890739338


##
flink-python/pyflink/datastream/connectors/elasticsearch.py:
##
@@ -0,0 +1,255 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+from enum import Enum
+from typing import List
+
+from pyflink.datastream.connectors import Sink, DeliveryGuarantee
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+class FlushBackoffType(Enum):
+"""
+Used to control whether the sink should retry failed requests at all or 
with which kind back off
+strategy.
+
+:data: `CONSTANT`:
+
+After every failure, it waits a configured time until the retries are 
exhausted.
+
+:data: `EXPONENTIAL`:
+
+After every failure, it waits initially the configured time and increases 
the waiting time
+exponentially until the retries are exhausted.
+
+:data: `NONE`:
+
+The failure is not retried.
+"""
+
+CONSTANT = 0,
+EXPONENTIAL = 1,
+NONE = 2,
+
+def _to_j_flush_backoff_type(self):
+JFlushBackoffType = get_gateway().jvm \
+.org.apache.flink.connector.elasticsearch.sink.FlushBackoffType
+return getattr(JFlushBackoffType, self.name)
+
+
+class ElasticsearchSinkBuilderBase(abc.ABC):
+"""
+Base builder to construct a ElasticsearchSink.
+"""
+
+@abc.abstractmethod
+def __init__(self):
+self._j_elasticsearch_sink_builder = None
+
+def set_emitter(self, emitter_class_name: str):

Review Comment:
   I implement a `PythonSimpleElasticsearchEmiiter`, I have tested it insert 
and update is successful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-25705) Translate "Metric Reporters" page of "Deployment" in to Chinese

2022-06-06 Thread Yun Gao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-25705.
---
Fix Version/s: 1.16.0
   Resolution: Fixed

> Translate "Metric Reporters" page of "Deployment" in to Chinese
> ---
>
> Key: FLINK-25705
> URL: https://issues.apache.org/jira/browse/FLINK-25705
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Chengkai Yang
>Assignee: Chengkai Yang
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.16.0
>
>
> The page url is 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters]
> The markdown file is located in 
> flink/docs/content.zh/docs/deployment/metric_reporters.md
> This issue should be merged after Flink-25830 and 
> [FLINK-26222|https://issues.apache.org/jira/browse/FLINK-26222]is merged or 
> solved.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25705) Translate "Metric Reporters" page of "Deployment" in to Chinese

2022-06-06 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550785#comment-17550785
 ] 

Yun Gao commented on FLINK-25705:
-

Merged on master via 5176669bc0ee678f21934f239d2b34a59102b1b8

> Translate "Metric Reporters" page of "Deployment" in to Chinese
> ---
>
> Key: FLINK-25705
> URL: https://issues.apache.org/jira/browse/FLINK-25705
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Chengkai Yang
>Assignee: Chengkai Yang
>Priority: Minor
>  Labels: auto-unassigned, pull-request-available
>
> The page url is 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters]
> The markdown file is located in 
> flink/docs/content.zh/docs/deployment/metric_reporters.md
> This issue should be merged after Flink-25830 and 
> [FLINK-26222|https://issues.apache.org/jira/browse/FLINK-26222]is merged or 
> solved.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] deadwind4 commented on a diff in pull request #19732: [FLINK-18887][python][connector/elasticsearch] Add Elasticsearch DataStream API

2022-06-06 Thread GitBox


deadwind4 commented on code in PR #19732:
URL: https://github.com/apache/flink/pull/19732#discussion_r890738562


##
flink-python/pyflink/datastream/connectors/__init__.py:
##
@@ -48,6 +50,10 @@
 'JdbcExecutionOptions',
 'NumberSequenceSource',
 'OutputFileConfig',
+'ElasticsearchSink',
+'Elasticsearch6SinkBuilder',
+'Elasticsearch7SinkBuilder',
+'FlushBackoffType',

Review Comment:
   > All the classes added in the future are placed in connector specific files
   This makes sense to me. All classes in each connector keep unifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ChengkaiYang2022 opened a new pull request, #19496: [FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment"

2022-06-06 Thread GitBox


ChengkaiYang2022 opened a new pull request, #19496:
URL: https://github.com/apache/flink/pull/19496

   
   
   ## What is the purpose of the change
   
   Translate the "Metric Reporters" page of "Deployment".This pr is based on 
previous work of https://github.com/apache/flink/pull/19023.
   
   
   ## Brief change log
   
   *(for example:)*
 - Translate some tables in this page .
   
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no 
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii closed pull request #19496: [FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment"

2022-06-06 Thread GitBox


gaoyunhaii closed pull request #19496: [FLINK-25705][docs]Translate "Metric 
Reporters" page of "Deployment" 
URL: https://github.com/apache/flink/pull/19496


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27710) Improve logs to better display Execution

2022-06-06 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-27710:

Description: 
Currently, an execution is usually represented as "{{{}job vertex name{}}} 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in 
logs, which may be redundant after this refactoring work. With the change of 
FLINK-17295, the representation of Execution in logs will be redundant. e.g. 
the subtask index is displayed 2 times.

Therefore, I'm proposing to change the format to be "<{{{}job vertex name> 
{}}}(<{{{}subtaskIndex>+1{}}}/<{{{}vertex parallelism>{}}}) 
{{#, vertexId: 
<{}}}{{{}JobVertexID>{}}}) " and avoid directly display the 
{{{}ExecutionAttemptID{}}}. This can increase the log readability.

Besides that, the displayed {{JobVertexID}} can also help to distinguish job 
vertices of the same name, which is common in DataStream jobs (e.g. multiple 
{{{}Map{}}}).

  was:
Currently, an execution is usually represented as "{{{}job vertex name{}}} 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in 
logs, which may be redundant after this refactoring work. With the change of 
FLINK-17295, the representation of Execution in logs will be redundant. e.g. 
the subtask index is displayed 2 times.

Therefore, I'm proposing to change the format to be "{{{}job vertex name{}}} 
({{{}short ExecutionGraphID{}}}:{{{}JobVertexID{}}}) 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}#attemptNumber{}}})" 
and avoid directly display the {{{}ExecutionAttemptID{}}}. This can increase 
the log readability.

Besides that, the displayed {{JobVertexID}} can also help to distinguish job 
vertices of the same name, which is common in DataStream jobs (e.g. multiple 
{{{}Map{}}}).


> Improve logs to better display Execution
> 
>
> Key: FLINK-27710
> URL: https://issues.apache.org/jira/browse/FLINK-27710
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Task
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.16.0
>
>
> Currently, an execution is usually represented as "{{{}job vertex name{}}} 
> ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in 
> logs, which may be redundant after this refactoring work. With the change of 
> FLINK-17295, the representation of Execution in logs will be redundant. e.g. 
> the subtask index is displayed 2 times.
> Therefore, I'm proposing to change the format to be "<{{{}job vertex name> 
> {}}}(<{{{}subtaskIndex>+1{}}}/<{{{}vertex parallelism>{}}}) 
> {{#, vertexId: 
> <{}}}{{{}JobVertexID>{}}}) " and avoid directly display the 
> {{{}ExecutionAttemptID{}}}. This can increase the log readability.
> Besides that, the displayed {{JobVertexID}} can also help to distinguish job 
> vertices of the same name, which is common in DataStream jobs (e.g. multiple 
> {{{}Map{}}}).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] paul8263 commented on a diff in pull request #19772: [FLINK-27579][client] The param client.timeout can not be set by dyna…

2022-06-06 Thread GitBox


paul8263 commented on code in PR #19772:
URL: https://github.com/apache/flink/pull/19772#discussion_r890730623


##
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java:
##
@@ -182,4 +196,45 @@ protected void executeProgram(Configuration configuration, 
PackagedProgram progr
 program.getUserCodeClassLoader().getClass().getName());
 }
 }
+
+public static void verifyCliFrontendWithDynamicProperties(
+Configuration configuration,
+String[] parameters,
+GenericCLI cliUnderTest,
+Duration expectedClientTimeout,
+int expectedDefaultParallelism)
+throws Exception {
+TestingCliFrontendWithDynamicProperties testFrontend =
+new TestingCliFrontendWithDynamicProperties(
+configuration,
+cliUnderTest,
+expectedClientTimeout,
+expectedDefaultParallelism);
+testFrontend.run(parameters); // verifies the expected values (see 
below)
+}
+
+private static final class TestingCliFrontendWithDynamicProperties extends 
CliFrontend {

Review Comment:
   @wangyang0918 ,
   Thank you very much for reviewing. I will update the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27791) SlotCountExceedingParallelismTest tests failed with NoResourceAvailableException

2022-06-06 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550769#comment-17550769
 ] 

Zhu Zhu commented on FLINK-27791:
-

[~wangyang0918] do you have any idea of this problem?

Flink is trying to get the login user via 
\{{org.apache.hadoop.security.UserGroupInformation.getCurrentUser}} but it 
failed due to \{{IllegalArgumentException}}

.

> SlotCountExceedingParallelismTest tests failed with 
> NoResourceAvailableException
> 
>
> Key: FLINK-27791
> URL: https://issues.apache.org/jira/browse/FLINK-27791
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: test-stability
> Attachments: error.log
>
>
> {code:java}
> 2022-05-25T12:16:09.2562348Z May 25 12:16:09 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2022-05-25T12:16:09.2563741Z May 25 12:16:09  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2022-05-25T12:16:09.2565457Z May 25 12:16:09  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:982)
> 2022-05-25T12:16:09.2567245Z May 25 12:16:09  at 
> org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.submitJobGraphAndWait(SlotCountExceedingParallelismTest.java:101)
> 2022-05-25T12:16:09.2569329Z May 25 12:16:09  at 
> org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.testNoSlotSharingAndBlockingResultBoth(SlotCountExceedingParallelismTest.java:94)
> 2022-05-25T12:16:09.2571889Z May 25 12:16:09  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-05-25T12:16:09.2573109Z May 25 12:16:09  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-05-25T12:16:09.2574528Z May 25 12:16:09  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-05-25T12:16:09.2575657Z May 25 12:16:09  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-05-25T12:16:09.2581380Z May 25 12:16:09  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-05-25T12:16:09.2582747Z May 25 12:16:09  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-05-25T12:16:09.2583600Z May 25 12:16:09  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-05-25T12:16:09.2584455Z May 25 12:16:09  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-05-25T12:16:09.2585172Z May 25 12:16:09  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-05-25T12:16:09.2585792Z May 25 12:16:09  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-05-25T12:16:09.2586376Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-05-25T12:16:09.2587035Z May 25 12:16:09  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-05-25T12:16:09.2587682Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-05-25T12:16:09.2588589Z May 25 12:16:09  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-05-25T12:16:09.2589623Z May 25 12:16:09  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-05-25T12:16:09.2590262Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-05-25T12:16:09.2590856Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-05-25T12:16:09.2591453Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-05-25T12:16:09.2592063Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-05-25T12:16:09.2592673Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-05-25T12:16:09.2593288Z May 25 12:16:09  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-05-25T12:16:09.2595864Z May 25 12:16:09  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2022-05-25T12:16:09.2596521Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-05-25T12:16:09.2597144Z May 25 12:16:09  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 2022-05-25T12:16:09.2597703Z May 25 12:16:09  at 
> org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 2022-05-25T12:16:09.2598247Z May 25 12:16:09  at 
> 

[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


Aitozi commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890728118


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##
@@ -1424,6 +1428,37 @@ public void 
testReclaimInactiveSlotsOnClearRequirements() throws Exception {
 }
 }
 
+@Test
+public void testProcessResourceRequirementsWithDelay() throws Exception {

Review Comment:
   Yes, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #19881: [FLINK-27899][quickstarts] fix deactivate the shade plugin doesn't ta…

2022-06-06 Thread GitBox


liuyongvs commented on PR #19881:
URL: https://github.com/apache/flink/pull/19881#issuecomment-1148144676

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


KarmaGYZ commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890726474


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##
@@ -1424,6 +1428,37 @@ public void 
testReclaimInactiveSlotsOnClearRequirements() throws Exception {
 }
 }
 
+@Test
+public void testProcessResourceRequirementsWithDelay() throws Exception {

Review Comment:
   I think we can process another resource requirement before manually 
triggering the scheduled task. Then the number of the pending task should be 
still equal to 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


Aitozi commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890724533


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##
@@ -1424,6 +1428,37 @@ public void 
testReclaimInactiveSlotsOnClearRequirements() throws Exception {
 }
 }
 
+@Test
+public void testProcessResourceRequirementsWithDelay() throws Exception {

Review Comment:
   Verified by new added
   
   ```
   Assertions.assertEquals(1, allocatedResourceCounter.get());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


Aitozi commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890723542


##
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##
@@ -142,6 +142,13 @@ public class ResourceManagerOptions {
 + START_WORKER_MAX_FAILURE_RATE.key()
 + "') is reached.");
 
+@Documentation.ExcludeFromDocumentation

Review Comment:
   thanks for your hint 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


KarmaGYZ commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890722048


##
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##
@@ -142,6 +142,13 @@ public class ResourceManagerOptions {
 + START_WORKER_MAX_FAILURE_RATE.key()
 + "') is reached.");
 
+@Documentation.ExcludeFromDocumentation

Review Comment:
   ```suggestion
   @Documentation.ExcludeFromDocumentation(
   "This is an expert option, that we do not want to expose in the 
documentation")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Vancior commented on a diff in pull request #19883: [FLINK-27901][python] support TableEnvironment.create(configuration)

2022-06-06 Thread GitBox


Vancior commented on code in PR #19883:
URL: https://github.com/apache/flink/pull/19883#discussion_r890721959


##
flink-python/pyflink/table/table_environment.py:
##
@@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()):
 self._open()
 
 @staticmethod
-def create(environment_settings: EnvironmentSettings) -> 
'TableEnvironment':
+def create(conf_or_settings: Union[EnvironmentSettings, Configuration]) -> 
'TableEnvironment':
 """
 Creates a table environment that is the entry point and central 
context for creating Table
 and SQL API programs.
 
-:param environment_settings: The environment settings used to 
instantiate the
+:param conf_or_settings: The configuration or environment settings 
used to instantiate the
  :class:`~pyflink.table.TableEnvironment`.
 :return: The :class:`~pyflink.table.TableEnvironment`.
 """
 gateway = get_gateway()
-j_tenv = gateway.jvm.TableEnvironment.create(
-environment_settings._j_environment_settings)
+if isinstance(conf_or_settings, EnvironmentSettings):
+environment_settings = conf_or_settings
+elif isinstance(conf_or_settings, Configuration):
+environment_settings = EnvironmentSettings.new_instance() \

Review Comment:
   `from_configuration` is deprecated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


Aitozi commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890718047


##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##
@@ -1347,7 +1351,7 @@ public void 
testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throw
 .setSlotTracker(slotTracker)
 .buildAndStart(
 ResourceManagerId.generate(),
-
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+Executors.directExecutor(),

Review Comment:
   It's an unused change now, reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


Aitozi commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890717053


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -407,13 +420,39 @@ public void freeSlot(SlotID slotId, AllocationID 
allocationId) {
 LOG.debug("Freeing slot {}.", slotId);
 
 slotTracker.notifyFree(slotId);
-checkResourceRequirements();
+checkResourceRequirementsWithDelay();
 }
 
 // 
-
 // Requirement matching
 // 
-
 
+/**
+ * Depending on the implementation of {@link ResourceAllocationStrategy}, 
checking resource
+ * requirements and potentially making a re-allocation can be heavy. In 
order to cover more
+ * changes with each check, thus reduce the frequency of unnecessary 
re-allocations, the checks
+ * are performed with a slight delay.
+ */
+private void checkResourceRequirementsWithDelay() {
+if (requirementsCheckDelay.toMillis() <= 0) {

Review Comment:
   Yes, will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


Aitozi commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890716617


##
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##
@@ -142,6 +142,13 @@ public class ResourceManagerOptions {
 + START_WORKER_MAX_FAILURE_RATE.key()
 + "') is reached.");
 
+@Documentation.ExcludeFromDocumentation

Review Comment:
   It is inspired by the comments in the `ResourceManagerRuntimeServices` to 
make this configurable. But I think it do not have the requirements to change 
the default value now. But I can remove the annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…

2022-06-06 Thread GitBox


KarmaGYZ commented on code in PR #19886:
URL: https://github.com/apache/flink/pull/19886#discussion_r890703027


##
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java:
##
@@ -142,6 +142,13 @@ public class ResourceManagerOptions {
 + START_WORKER_MAX_FAILURE_RATE.key()
 + "') is reached.");
 
+@Documentation.ExcludeFromDocumentation

Review Comment:
   Why do you introduce this config option? Do you mean to allow users to 
configure it? If so, why do you exclude it from the documentation?



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java:
##
@@ -407,13 +420,39 @@ public void freeSlot(SlotID slotId, AllocationID 
allocationId) {
 LOG.debug("Freeing slot {}.", slotId);
 
 slotTracker.notifyFree(slotId);
-checkResourceRequirements();
+checkResourceRequirementsWithDelay();
 }
 
 // 
-
 // Requirement matching
 // 
-
 
+/**
+ * Depending on the implementation of {@link ResourceAllocationStrategy}, 
checking resource
+ * requirements and potentially making a re-allocation can be heavy. In 
order to cover more
+ * changes with each check, thus reduce the frequency of unnecessary 
re-allocations, the checks
+ * are performed with a slight delay.
+ */
+private void checkResourceRequirementsWithDelay() {
+if (requirementsCheckDelay.toMillis() <= 0) {

Review Comment:
   If we allow users to configure the delay period, we should also check it in 
`FineGrainedSlotManager`



##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##
@@ -1347,7 +1351,7 @@ public void 
testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throw
 .setSlotTracker(slotTracker)
 .buildAndStart(
 ResourceManagerId.generate(),
-
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+Executors.directExecutor(),

Review Comment:
   Could you help me to understand why we need this change?



##
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##
@@ -1424,6 +1428,37 @@ public void 
testReclaimInactiveSlotsOnClearRequirements() throws Exception {
 }
 }
 
+@Test
+public void testProcessResourceRequirementsWithDelay() throws Exception {

Review Comment:
   We'd better also test that the check method will be triggered only once in 
one delay period.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19888: [FLINK-27920][doc] Documented enums constant support ExcludeFromDocumentation annotation

2022-06-06 Thread GitBox


flinkbot commented on PR #19888:
URL: https://github.com/apache/flink/pull/19888#issuecomment-1148128870

   
   ## CI report:
   
   * 7d6308a037148b6340eab31bc37c963fc0465a65 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27920) Documented enums constant support ExcludeFromDocumentation annotation

2022-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27920:
---
Labels: pull-request-available  (was: )

> Documented enums constant support ExcludeFromDocumentation annotation
> -
>
> Key: FLINK-27920
> URL: https://issues.apache.org/jira/browse/FLINK-27920
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>
> if a config option has @ExcludeFromDocumentation annotation, it will not 
> appear in the document. But for an enumeration type, sometimes we only want 
> some of it's constant values not to appear in the document, this ticket 
> solves this problem.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] reswqa opened a new pull request, #19888: [FLINK-27920][doc] Documented enums constant support ExcludeFromDocumentation annotation

2022-06-06 Thread GitBox


reswqa opened a new pull request, #19888:
URL: https://github.com/apache/flink/pull/19888

   ## What is the purpose of the change
   
   if a config option has @ExcludeFromDocumentation annotation, it will not 
appear in the document. But for an enumeration type, sometimes we only want 
some of its constant values not to appear in the document, this pull request 
fulfilled this requirement.
   
   ## Brief change log
   
 - *let configOptionsDocGenerator take care of ExcludeFromDocumentation 
annotation when getEnumOptionsDescription*
   
   ## Verifying this change
   
   This change added tests and can be verified by 
org.apache.flink.docs.configuration.ConfigOptionsDocGeneratorTest#TestConfigGroupWithEnumConstantExclusion
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19887: [FLINK-27924][python][docs] Add pulsar comments in the datastream package

2022-06-06 Thread GitBox


flinkbot commented on PR #19887:
URL: https://github.com/apache/flink/pull/19887#issuecomment-1148121181

   
   ## CI report:
   
   * b8d1a745298de2069566aa871db98424b57dc78e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27920) Documented enums constant support ExcludeFromDocumentation annotation

2022-06-06 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-27920:
---
Issue Type: Improvement  (was: Bug)

> Documented enums constant support ExcludeFromDocumentation annotation
> -
>
> Key: FLINK-27920
> URL: https://issues.apache.org/jira/browse/FLINK-27920
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Weijie Guo
>Priority: Major
>
> if a config option has @ExcludeFromDocumentation annotation, it will not 
> appear in the document. But for an enumeration type, sometimes we only want 
> some of it's constant values not to appear in the document, this ticket 
> solves this problem.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27828) FlinkKafkaProducer VS KafkaSink

2022-06-06 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550752#comment-17550752
 ] 

Qingsheng Ren commented on FLINK-27828:
---

[~Jiangfei Liu] Could you share some context of your benchmark, like the 
topology of the job, parallelism, checkpoint configuration and so forth?

> FlinkKafkaProducer VS KafkaSink
> ---
>
> Key: FLINK-27828
> URL: https://issues.apache.org/jira/browse/FLINK-27828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.3
>Reporter: Jiangfei Liu
>Priority: Major
> Attachments: Snipaste_2022-05-25_19-52-11.png
>
>
> sorry,my english is bad.
> in flink1.14.3,write 1 data to kafka.
> when use FlinkKafkaProducer,completed 7s
> when use KafkaSink,completed 1m40s
> why KafkaSink is low speed?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890707072


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##
@@ -18,38 +18,73 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
 
-import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+/** The path to the file containing persisted records. */
 private final Path path;
 
-/** The count of the records in the file. */
+/**
+ * The count of records in the file at the path if the file size is not 
zero, otherwise the
+ * count of records in the cache.
+ */
 private final int count;
 
-/** The total length of file. */
-private final long size;
+/** The total length of file containing persisted records. */
+private long fsSize = -1L;
+
+/** The memory segments containing cached records. */

Review Comment:
   Now the actual behavior is that the cache list is null rather than empty 
when not cached. I think it better satisfies Java's default values. How do you 
think we should treat null and empty list accordingly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27921) Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager

2022-06-06 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-27921:
---
Description: 
As discussed in 
[https://github.com/apache/flink/pull/19840#discussion_r884242067] .This 
ticket is meant to introduce the same mechanism to wait for a slight delay 
before process the resource check with {{{}FineGrainedSlotManager{}}}. It will 
reduce the frequency of unnecessary re-allocations

  was:
As discussed in 
https://github.com/apache/flink/pull/19840#discussion_r884242067 .This 
 ticket is meant to introduce the same mechanism to wait for a slight delay 
before process the resource check with {{FineGrainedSlotManager}}. It will the 
frequency of unnecessary re-allocations


> Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager
> --
>
> Key: FLINK-27921
> URL: https://issues.apache.org/jira/browse/FLINK-27921
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> As discussed in 
> [https://github.com/apache/flink/pull/19840#discussion_r884242067] .This 
> ticket is meant to introduce the same mechanism to wait for a slight delay 
> before process the resource check with {{{}FineGrainedSlotManager{}}}. It 
> will reduce the frequency of unnecessary re-allocations



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27924) Add pulsar comments in the datastream package

2022-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27924:
---
Labels: pull-request-available  (was: )

> Add pulsar comments in the datastream package
> -
>
> Key: FLINK-27924
> URL: https://issues.apache.org/jira/browse/FLINK-27924
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Priority: Major
>  Labels: pull-request-available
>
> https://github.com/apache/flink/pull/19732#discussion_r883356468



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] deadwind4 opened a new pull request, #19887: [FLINK-27924][python][docs] Add pulsar comments in the datastream package

2022-06-06 Thread GitBox


deadwind4 opened a new pull request, #19887:
URL: https://github.com/apache/flink/pull/19887

   ## What is the purpose of the change
   
   Add pulsar comments in the datastream package
   
   ## Brief change log
   
 - *Add pulsar comments in __init__.py of the datastream package*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no
 - If yes, how is the feature documented? (PyDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27925) Avoid to create watcher without the resourceVersion

2022-06-06 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550749#comment-17550749
 ] 

Aitozi commented on FLINK-27925:


cc [~wangyang0918] 

> Avoid to create watcher without the resourceVersion
> ---
>
> Key: FLINK-27925
> URL: https://issues.apache.org/jira/browse/FLINK-27925
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Aitozi
>Priority: Major
>
> Currently, we create the watcher in KubernetesResourceManager. But it do not 
> pass the resourceVersion parameter, it will trigger a request to etcd. It 
> will bring the burden to the etcd in large scale cluster (which have been 
> seen in our internal k8s cluster). More detail can be found 
> [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter]
>  
> I think we could use the informer to improve it (which will spawn a 
> list-watch and maintain the resourceVersion internally)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27924) Add pulsar comments in the datastream package

2022-06-06 Thread LuNng Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LuNng Wang updated FLINK-27924:
---
Component/s: Documentation

> Add pulsar comments in the datastream package
> -
>
> Key: FLINK-27924
> URL: https://issues.apache.org/jira/browse/FLINK-27924
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Priority: Major
>
> https://github.com/apache/flink/pull/19732#discussion_r883356468



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #258: [chore] Change the session cluster not found log level to error

2022-06-06 Thread GitBox


gyfora merged PR #258:
URL: https://github.com/apache/flink-kubernetes-operator/pull/258


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27925) Avoid to create watcher without the resourceVersion

2022-06-06 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-27925:
---
Description: 
Currently, we create the watcher in KubernetesResourceManager. But it do not 
pass the resourceVersion parameter, it will trigger a request to etcd. It will 
bring the burden to the etcd in large scale cluster (which have been seen in 
our internal k8s cluster). More detail can be found 
[here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter]
 

I think we could use the informer to improve it (which will spawn a list-watch 
and maintain the resourceVersion internally)

  was:
Currently, we create the watcher in KubernetesResourceManager. But it do not 
pass the resourceVersion parameter, it will read from the etcd. It will bring 
the burden to the etcd in large scale cluster (which have been seen in our 
internal k8s cluster). More detail can be found 
[here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter]
 

I think we could use the informer to improve it (which will spawn a list-watch 
and maintain the resourceVersion internally)


> Avoid to create watcher without the resourceVersion
> ---
>
> Key: FLINK-27925
> URL: https://issues.apache.org/jira/browse/FLINK-27925
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Reporter: Aitozi
>Priority: Major
>
> Currently, we create the watcher in KubernetesResourceManager. But it do not 
> pass the resourceVersion parameter, it will trigger a request to etcd. It 
> will bring the burden to the etcd in large scale cluster (which have been 
> seen in our internal k8s cluster). More detail can be found 
> [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter]
>  
> I think we could use the informer to improve it (which will spawn a 
> list-watch and maintain the resourceVersion internally)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27925) Avoid to create watcher without the resourceVersion

2022-06-06 Thread Aitozi (Jira)
Aitozi created FLINK-27925:
--

 Summary: Avoid to create watcher without the resourceVersion
 Key: FLINK-27925
 URL: https://issues.apache.org/jira/browse/FLINK-27925
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Aitozi


Currently, we create the watcher in KubernetesResourceManager. But it do not 
pass the resourceVersion parameter, it will read from the etcd. It will bring 
the burden to the etcd in large scale cluster (which have been seen in our 
internal k8s cluster). More detail can be found 
[here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter]
 

I think we could use the informer to improve it (which will spawn a list-watch 
and maintain the resourceVersion internally)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26942) Support SELECT clause in CREATE TABLE(CTAS)

2022-06-06 Thread waywtdcc (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550743#comment-17550743
 ] 

waywtdcc commented on FLINK-26942:
--

CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ] 
[ WITH [ NO ] DATA ]




I think [with [no] data] should be added to indicate whether to only create a 
table without writing data

> Support SELECT clause in CREATE TABLE(CTAS)
> ---
>
> Key: FLINK-26942
> URL: https://issues.apache.org/jira/browse/FLINK-26942
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: tartarus
>Priority: Major
> Fix For: 1.16.0
>
>
> Support CTAS(CREATE TABLE AS SELECT) syntax
> {code:java}
> CREATE TABLE [ IF NOT EXISTS ] table_name 
> [ WITH ( table_properties ) ]
> [ AS query_expression ] {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27924) Add pulsar comments in the datastream package

2022-06-06 Thread LuNng Wang (Jira)
LuNng Wang created FLINK-27924:
--

 Summary: Add pulsar comments in the datastream package
 Key: FLINK-27924
 URL: https://issues.apache.org/jira/browse/FLINK-27924
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.0
Reporter: LuNng Wang


https://github.com/apache/flink/pull/19732#discussion_r883356468



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] pengmide commented on pull request #19869: [FLINK-21996][python] Support Kinesis connector in Python DataStream API

2022-06-06 Thread GitBox


pengmide commented on PR #19869:
URL: https://github.com/apache/flink/pull/19869#issuecomment-1148113249

   @dianfu I have fixed the problem, please help to review again~ thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27921) Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager

2022-06-06 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-27921:
--

Assignee: Aitozi

> Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager
> --
>
> Key: FLINK-27921
> URL: https://issues.apache.org/jira/browse/FLINK-27921
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Aitozi
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> As discussed in 
> https://github.com/apache/flink/pull/19840#discussion_r884242067 .This 
>  ticket is meant to introduce the same mechanism to wait for a slight delay 
> before process the resource check with {{FineGrainedSlotManager}}. It will 
> the frequency of unnecessary re-allocations



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890701884


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/** A class that writes cache data to a target file in given file system. */
+@Internal
+class FileSegmentWriter implements SegmentWriter {
+
+/** The tool to serialize received records into bytes. */
+private final TypeSerializer serializer;
+
+/** The path to the target file. */
+private final Path path;
+
+/** The output stream that writes to the target file. */
+private final FSDataOutputStream outputStream;
+
+/** A buffer that wraps the output stream to optimize performance. */
+private final BufferedOutputStream bufferedOutputStream;

Review Comment:
   Since we cannot remove the `bufferedOutputStream.flush()` in `finish()` 
according to the comment above, we cannot remove this variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890701650


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/** A class that writes cache data to a target file in given file system. */
+@Internal
+class FileSegmentWriter implements SegmentWriter {
+
+/** The tool to serialize received records into bytes. */
+private final TypeSerializer serializer;
+
+/** The path to the target file. */
+private final Path path;
+
+/** The output stream that writes to the target file. */
+private final FSDataOutputStream outputStream;
+
+/** A buffer that wraps the output stream to optimize performance. */
+private final BufferedOutputStream bufferedOutputStream;
+
+/** The wrapper view of output stream to be used with TypeSerializer API. 
*/
+private final DataOutputView outputView;
+
+/** The number of records added so far. */
+private int count;
+
+FileSegmentWriter(TypeSerializer serializer, Path path) throws 
IOException {
+this.serializer = serializer;
+this.path = path;
+this.outputStream = path.getFileSystem().create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+this.bufferedOutputStream = new BufferedOutputStream(outputStream);
+this.outputView = new 
DataOutputViewStreamWrapper(bufferedOutputStream);
+}
+
+@Override
+public boolean addRecord(T record) throws IOException {
+if (outputStream.getPos() >= DataCacheWriter.MAX_SEGMENT_SIZE) {
+return false;
+}
+serializer.serialize(record, outputView);
+count++;
+return true;
+}
+
+@Override
+public Optional finish() throws IOException {
+bufferedOutputStream.flush();

Review Comment:
   `bufferedOutputStream` wraps around `outputStream` and is invisible to 
`outputStream`. When `outputStream.flush()` is invoked, there might be some 
bytes left in `bufferedOutputStream`. When I tried to remove this line, most 
data cache tests failed. Thus this line cannot be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890693618


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##
@@ -18,38 +18,73 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
 
-import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+/** The path to the file containing persisted records. */
 private final Path path;
 
-/** The count of the records in the file. */
+/**
+ * The count of records in the file at the path if the file size is not 
zero, otherwise the
+ * count of records in the cache.
+ */
 private final int count;
 
-/** The total length of file. */
-private final long size;
+/** The total length of file containing persisted records. */
+private long fsSize = -1L;
+
+/** The memory segments containing cached records. */

Review Comment:
   Could you update the code explaining that `the cache list is empty iff the 
segment has not been cached in memory`?



##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##
@@ -18,38 +18,73 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
 
-import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+/** The path to the file containing persisted records. */
 private final Path path;
 
-/** The count of the records in the file. */
+/**
+ * The count of records in the file at the path if the file size is not 
zero, otherwise the
+ * count of records in the cache.
+ */
 private final int count;
 
-/** The total length of file. */
-private final long size;
+/** The total length of file containing persisted records. */
+private long fsSize = -1L;
+
+/** The memory segments containing cached records. */
+private List cache;
+
+Segment(Path path, int count, long fsSize) {
+this.path = checkNotNull(path);
+checkArgument(count > 0);
+this.count = count;
+checkArgument(fsSize > 0);
+this.fsSize = fsSize;
+}
 
-public Segment(Path path, int count, long size) {
-this.path = path;
+Segment(Path path, int count, List cache) {
+this.path = checkNotNull(path);
+checkArgument(count > 0);
 this.count = count;
-this.size = size;
+this.cache = checkNotNull(cache);
 }
 
-public Path getPath() {
+void setCache(List cache) {
+this.cache = checkNotNull(cache);
+}
+
+void setFsSize(long fsSize) {
+checkArgument(fsSize > 0);
+this.fsSize = fsSize;
+}
+
+Path getPath() {
 return path;
 }
 
-public int getCount() {
+int getCount() {
 return count;
 }
 
-public long getSize() {
-return size;
+long getFsSize() {
+return fsSize;
+}
+
+List getCache() {
+return cache;

Review Comment:
   Instead of indicating `the segment has been read into the memory` by using 
`cache == 0`, would it be simpler to use `cache.isEmpty()`?
   
   This would allow us to handle less special case (e.g. null). After all if 
the segment has been read into memory, it must have non-empty memory segment 
list.



##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##
@@ -18,38 +18,73 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
 
-import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/** A segment contains the information about 

[GitHub] [flink-table-store] JingsongLi commented on pull request #119: POC for ALTER TABLE ... COMPACT

2022-06-06 Thread GitBox


JingsongLi commented on PR #119:
URL: 
https://github.com/apache/flink-table-store/pull/119#issuecomment-1148105554

   Hi @LadyForest  I think we can have a new formal PR to discuss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] czy006 commented on pull request #256: [FLINK-27646] Flink Operator RoadMap Page

2022-06-06 Thread GitBox


czy006 commented on PR #256:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/256#issuecomment-1148103093

   @gyfora PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #19884: [FLINK-26764][table-planner] Support RESPECT NULLS for function FIRST_VALUE/LAST_VALUE

2022-06-06 Thread GitBox


luoyuxia commented on PR #19884:
URL: https://github.com/apache/flink/pull/19884#issuecomment-1148102979

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #19876: [FLINK-27895][build] Enable the CI test for Hive 3.1.2

2022-06-06 Thread GitBox


luoyuxia commented on PR #19876:
URL: https://github.com/apache/flink/pull/19876#issuecomment-1148102825

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dianfu commented on a diff in pull request #19883: [FLINK-27901][python] support TableEnvironment.create(configuration)

2022-06-06 Thread GitBox


dianfu commented on code in PR #19883:
URL: https://github.com/apache/flink/pull/19883#discussion_r890690352


##
flink-python/pyflink/table/table_environment.py:
##
@@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()):
 self._open()
 
 @staticmethod
-def create(environment_settings: EnvironmentSettings) -> 
'TableEnvironment':

Review Comment:
   Also update the documentation in 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/python_config/.



##
flink-python/pyflink/table/table_environment.py:
##
@@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()):
 self._open()
 
 @staticmethod
-def create(environment_settings: EnvironmentSettings) -> 
'TableEnvironment':
+def create(conf_or_settings: Union[EnvironmentSettings, Configuration]) -> 
'TableEnvironment':

Review Comment:
   This will break backward compatibility. Users may write code as following 
`TableEnvironment.create(environment_settings=env_settings)`. What about keep 
the name not changed?



##
flink-python/pyflink/table/table_environment.py:
##
@@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()):
 self._open()
 
 @staticmethod
-def create(environment_settings: EnvironmentSettings) -> 
'TableEnvironment':
+def create(conf_or_settings: Union[EnvironmentSettings, Configuration]) -> 
'TableEnvironment':
 """
 Creates a table environment that is the entry point and central 
context for creating Table
 and SQL API programs.
 
-:param environment_settings: The environment settings used to 
instantiate the
+:param conf_or_settings: The configuration or environment settings 
used to instantiate the
  :class:`~pyflink.table.TableEnvironment`.
 :return: The :class:`~pyflink.table.TableEnvironment`.
 """
 gateway = get_gateway()
-j_tenv = gateway.jvm.TableEnvironment.create(
-environment_settings._j_environment_settings)
+if isinstance(conf_or_settings, EnvironmentSettings):
+environment_settings = conf_or_settings
+elif isinstance(conf_or_settings, Configuration):
+environment_settings = EnvironmentSettings.new_instance() \

Review Comment:
   EnvironmentSettings.from_configuration(conf_or_settings)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance

2022-06-06 Thread GitBox


lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890665195


##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##
@@ -19,127 +19,162 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter {
 
+/** A soft limit on the max allowed size of a single segment. */
+static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+/** The tool to serialize received records into bytes. */
 private final TypeSerializer serializer;
 
+/** The file system that contains the cache files. */
 private final FileSystem fileSystem;
 
+/** A generator to generate paths of cache files. */
 private final SupplierWithException pathGenerator;
 
-private final List finishSegments;
+/** An optional pool that provide memory segments to hold cached records 
in memory. */
+@Nullable private final MemorySegmentPool segmentPool;
+
+/** The segments that contain previously added records. */
+private final List finishedSegments;
 
-private SegmentWriter currentSegment;
+/** The current writer for new records. */
+@Nullable private SegmentWriter currentSegmentWriter;
 
 public DataCacheWriter(
 TypeSerializer serializer,
 FileSystem fileSystem,
 SupplierWithException pathGenerator)
 throws IOException {
-this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
 }
 
 public DataCacheWriter(
 TypeSerializer serializer,
 FileSystem fileSystem,
 SupplierWithException pathGenerator,
-List priorFinishedSegments)
+MemorySegmentPool segmentPool)
 throws IOException {
-this.serializer = serializer;
-this.fileSystem = fileSystem;
-this.pathGenerator = pathGenerator;
-
-this.finishSegments = new ArrayList<>(priorFinishedSegments);
+this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
+}
 
-this.currentSegment = new SegmentWriter(pathGenerator.get());
+public DataCacheWriter(
+TypeSerializer serializer,
+FileSystem fileSystem,
+SupplierWithException pathGenerator,
+List finishedSegments)
+throws IOException {
+this(serializer, fileSystem, pathGenerator, null, finishedSegments);
 }
 
-public void addRecord(T record) throws IOException {
-currentSegment.addRecord(record);
+public DataCacheWriter(
+TypeSerializer serializer,
+FileSystem fileSystem,
+SupplierWithException pathGenerator,
+@Nullable MemorySegmentPool segmentPool,
+List finishedSegments)
+throws IOException {
+this.fileSystem = fileSystem;
+this.pathGenerator = pathGenerator;
+this.segmentPool = segmentPool;
+this.serializer = serializer;
+this.finishedSegments = new ArrayList<>();
+this.finishedSegments.addAll(finishedSegments);

Review Comment:
   nits: would it be simpler to use `this.finishedSegments = new 
ArrayList<>(priorFinishedSegments)`, similar to the approach before this PR?



##
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentReader.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT 

[jira] [Updated] (FLINK-27901) Support TableEnvironment.create(configuration) in PyFlink

2022-06-06 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-27901:

Summary: Support TableEnvironment.create(configuration) in PyFlink  (was: 
support TableEnvironment.create(configuration) in PyFlink)

> Support TableEnvironment.create(configuration) in PyFlink
> -
>
> Key: FLINK-27901
> URL: https://issues.apache.org/jira/browse/FLINK-27901
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: Juntao Hu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Align TableEnvironment.create(configuration) API with Java.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-24999) flink-python doesn't work on Java 17

2022-06-06 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-24999:
---

Assignee: LuNng Wang

> flink-python doesn't work on Java 17
> 
>
> Key: FLINK-24999
> URL: https://issues.apache.org/jira/browse/FLINK-24999
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Chesnay Schepler
>Assignee: LuNng Wang
>Priority: Major
>
> {code:java}
> java.lang.UnsupportedOperationException: sun.misc.Unsafe or 
> java.nio.DirectByteBuffer.(long, int) not available
>  at 
> io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
>  at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257)
>  at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247)
>  at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248)
>  at 
> org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:228)
>  at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:242)
>  at 
> org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
>  at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
>  at 
> org.apache.flink.table.runtime.arrow.ArrowUtilsTest.testReadArrowBatches(ArrowUtilsTest.java:389)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-24999) flink-python doesn't work on Java 17

2022-06-06 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550735#comment-17550735
 ] 

Dian Fu commented on FLINK-24999:
-

[~ana4] Thanks for taking this. Have assigned it to you~

> flink-python doesn't work on Java 17
> 
>
> Key: FLINK-24999
> URL: https://issues.apache.org/jira/browse/FLINK-24999
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Chesnay Schepler
>Assignee: LuNng Wang
>Priority: Major
>
> {code:java}
> java.lang.UnsupportedOperationException: sun.misc.Unsafe or 
> java.nio.DirectByteBuffer.(long, int) not available
>  at 
> io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
>  at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257)
>  at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247)
>  at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248)
>  at 
> org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:228)
>  at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:242)
>  at 
> org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
>  at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
>  at 
> org.apache.flink.table.runtime.arrow.ArrowUtilsTest.testReadArrowBatches(ArrowUtilsTest.java:389)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22

2022-06-06 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-26070.
---
Resolution: Duplicate

> Update dependency numpy to >=1.21.0,<1.22
> -
>
> Key: FLINK-26070
> URL: https://issues.apache.org/jira/browse/FLINK-26070
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Reporter: Martijn Visser
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> PyFlink currently sets the required numpy version as >=1.14.3,<1.2
> We should update this to >=1.21.0,<1.22
> Updating numpy will also resolve being marked as vulnerable for 
> https://nvd.nist.gov/vuln/detail/CVE-2021-33430



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Reopened] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22

2022-06-06 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reopened FLINK-26070:
-

> Update dependency numpy to >=1.21.0,<1.22
> -
>
> Key: FLINK-26070
> URL: https://issues.apache.org/jira/browse/FLINK-26070
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Reporter: Martijn Visser
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> PyFlink currently sets the required numpy version as >=1.14.3,<1.2
> We should update this to >=1.21.0,<1.22
> Updating numpy will also resolve being marked as vulnerable for 
> https://nvd.nist.gov/vuln/detail/CVE-2021-33430



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22

2022-06-06 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-26070.
---
Resolution: Done

> Update dependency numpy to >=1.21.0,<1.22
> -
>
> Key: FLINK-26070
> URL: https://issues.apache.org/jira/browse/FLINK-26070
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Reporter: Martijn Visser
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> PyFlink currently sets the required numpy version as >=1.14.3,<1.2
> We should update this to >=1.21.0,<1.22
> Updating numpy will also resolve being marked as vulnerable for 
> https://nvd.nist.gov/vuln/detail/CVE-2021-33430



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22

2022-06-06 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550734#comment-17550734
 ] 

Dian Fu commented on FLINK-26070:
-

[~ana4] Thanks for the work in https://github.com/apache/flink/pull/19802. I'm 
closing this ticket~

> Update dependency numpy to >=1.21.0,<1.22
> -
>
> Key: FLINK-26070
> URL: https://issues.apache.org/jira/browse/FLINK-26070
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Reporter: Martijn Visser
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> PyFlink currently sets the required numpy version as >=1.14.3,<1.2
> We should update this to >=1.21.0,<1.22
> Updating numpy will also resolve being marked as vulnerable for 
> https://nvd.nist.gov/vuln/detail/CVE-2021-33430



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27868) Harden running job check before triggering savepoints or savepoint upgrades

2022-06-06 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550716#comment-17550716
 ] 

Aitozi commented on FLINK-27868:


I think it will be a good improvement to savepoint with the work of 
[https://github.com/apache/flink-kubernetes-operator/pull/255]

One question here: How about the FINISH status will be handled? IMO, the finish 
task will not affect the jobs RUNNING status I think (do not have to be reset 
to CREATED) 

> Harden running job check before triggering savepoints or savepoint upgrades
> ---
>
> Key: FLINK-27868
> URL: https://issues.apache.org/jira/browse/FLINK-27868
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Matyas Orhidi
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> Even if the job is in RUNNING state, often not all subtasks are yet running 
> which leads to savepoint upgrade / savepoint trigger failures. We should 
> harden the isRunning check we use to include subtask states as well.
> This suggestion is desribed more in detail by [~matyas] here: 
> https://github.com/apache/flink-kubernetes-operator/pull/237#issuecomment-1137054088



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #258: [chore] Change the session cluster not found log level to error

2022-06-06 Thread GitBox


Aitozi commented on code in PR #258:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/258#discussion_r890651487


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java:
##
@@ -54,7 +54,7 @@ public boolean sessionClusterReady(Optional 
flinkDeploymentOpt)
 return true;
 }
 } else {
-logger.info("Session cluster deployment is not found");
+logger.error("Session cluster deployment is not found");

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

2022-06-06 Thread GitBox


Aitozi commented on code in PR #255:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890648242


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
 .map(Savepoint::getLocation)
 .orElse(null);
 
-observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-.ifPresent(
-err ->
-EventUtils.createOrUpdateEvent(
-flinkService.getKubernetesClient(),
-resource,
-EventUtils.Type.Warning,
-"SavepointError",
-SavepointUtils.createSavepointError(
-savepointInfo,
-resource.getSpec()
-.getJob()
-
.getSavepointTriggerNonce()),
-EventUtils.Component.Operator));
-
-// We only need to observe latest checkpoint/savepoint for terminal 
jobs
-if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+// If any manual or periodic savepoint is in progress, observe it
+if (SavepointUtils.savepointInProgress(jobStatus)) {
+observeTriggeredSavepoint(resource, jobId, deployedConfig);
 }
 
-var currentLastSpPath =
-Optional.ofNullable(savepointInfo.getLastSavepoint())
-.map(Savepoint::getLocation)
-.orElse(null);
-
-// If the last savepoint information changes we need to patch the 
status
-// to avoid losing this in case of an operator failure after the 
cluster was shut down
-if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-LOG.info(
-"Updating resource status after observing new last 
savepoint {}",
-currentLastSpPath);
-statusHelper.patchAndCacheStatus(resource);
+// If job is in globally terminal state, observe last savepoint
+if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
 }
+
+patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
 }
 
 /**
  * Observe the savepoint result based on the current savepoint info.
  *
- * @param currentSavepointInfo the current savepoint info.
+ * @param resource the resource being observed
  * @param jobID the jobID of the observed job.
  * @param deployedConfig Deployed job config.
  * @return The observed error, if no error observed, {@code 
Optional.empty()} will be returned.
  */
-private Optional observeTriggeredSavepointProgress(
-SavepointInfo currentSavepointInfo, String jobID, Configuration 
deployedConfig) {
-if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-LOG.debug("Savepoint not in progress");
-return Optional.empty();
-}
+private void observeTriggeredSavepoint(
+AbstractFlinkResource resource, String jobID, Configuration 
deployedConfig) {
+
+var savepointInfo = 
resource.getStatus().getJobStatus().getSavepointInfo();
+
 LOG.info("Observing savepoint status.");
-SavepointFetchResult savepointFetchResult =
+var savepointFetchResult =
 flinkService.fetchSavepointInfo(
-currentSavepointInfo.getTriggerId(), jobID, 
deployedConfig);
+savepointInfo.getTriggerId(), jobID, deployedConfig);
 
 if (savepointFetchResult.isPending()) {

Review Comment:
   Get your point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md

2022-06-06 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-27923.
--
Resolution: Fixed

> Typo fix for release-1.0.0 quick-start.md
> -
>
> Key: FLINK-27923
> URL: https://issues.apache.org/jira/browse/FLINK-27923
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> Noticed a typo while deploying the example.
> Currently:
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml
> Where it should be:
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #259: [FLINK-27923] Update quick-start.md

2022-06-06 Thread GitBox


gyfora merged PR #259:
URL: https://github.com/apache/flink-kubernetes-operator/pull/259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] jbusche commented on a diff in pull request #259: [FLINK-27923] Update quick-start.md

2022-06-06 Thread GitBox


jbusche commented on code in PR #259:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/259#discussion_r890392876


##
docs/content/docs/try-flink-kubernetes-operator/quick-start.md:
##
@@ -99,7 +99,7 @@ flink-kubernetes-operator default 1 2022-03-09 17 
(tel:12022030917):39:55.461359
 
 Once the operator is running as seen in the previous step you are ready to 
submit a Flink job:
 ```bash
-kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml
+kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml

Review Comment:
   Hi @gyfora, sure that looks like it works as well, I'll make that change.  
Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md

2022-06-06 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-27923:
--

Assignee: James Busche

> Typo fix for release-1.0.0 quick-start.md
> -
>
> Key: FLINK-27923
> URL: https://issues.apache.org/jira/browse/FLINK-27923
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> Noticed a typo while deploying the example.
> Currently:
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml
> Where it should be:
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #259: [FLINK-27923] Update quick-start.md

2022-06-06 Thread GitBox


gyfora commented on code in PR #259:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/259#discussion_r890361897


##
docs/content/docs/try-flink-kubernetes-operator/quick-start.md:
##
@@ -99,7 +99,7 @@ flink-kubernetes-operator default 1 2022-03-09 17 
(tel:12022030917):39:55.461359
 
 Once the operator is running as seen in the previous step you are ready to 
submit a Flink job:
 ```bash
-kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml
+kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml

Review Comment:
   Maybe it would be better to just have `release-1.0/...`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md

2022-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27923:
---
Labels: pull-request-available  (was: )

> Typo fix for release-1.0.0 quick-start.md
> -
>
> Key: FLINK-27923
> URL: https://issues.apache.org/jira/browse/FLINK-27923
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: James Busche
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> Noticed a typo while deploying the example.
> Currently:
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml
> Where it should be:
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] jbusche opened a new pull request, #259: [FLINK-27923] Update quick-start.md

2022-06-06 Thread GitBox


jbusche opened a new pull request, #259:
URL: https://github.com/apache/flink-kubernetes-operator/pull/259

   Fixing the example from release-0.1 to release-1.0.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md

2022-06-06 Thread James Busche (Jira)
James Busche created FLINK-27923:


 Summary: Typo fix for release-1.0.0 quick-start.md
 Key: FLINK-27923
 URL: https://issues.apache.org/jira/browse/FLINK-27923
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.0.0
Reporter: James Busche
 Fix For: kubernetes-operator-1.0.0


Noticed a typo while deploying the example.

Currently:
kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml


Where it should be:

kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch

2022-06-06 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550583#comment-17550583
 ] 

David Anderson commented on FLINK-27910:


This is super confusing, and there's no reasonable workaround – so I've bumped 
up the priority to Critical.

> FileSink not registered the timer to enforce rolling policy if started from 
> scratch
> ---
>
> Key: FLINK-27910
> URL: https://issues.apache.org/jira/browse/FLINK-27910
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Priority: Critical
>
> The current FileWriter only register the timer in initializeState, which is 
> now only called on restoring. Thus if the job is started from scratch, the 
> timer would fail to be registered and cause the rolling policy not work. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch

2022-06-06 Thread David Anderson (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-27910:
---
Priority: Critical  (was: Major)

> FileSink not registered the timer to enforce rolling policy if started from 
> scratch
> ---
>
> Key: FLINK-27910
> URL: https://issues.apache.org/jira/browse/FLINK-27910
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yun Gao
>Priority: Critical
>
> The current FileWriter only register the timer in initializeState, which is 
> now only called on restoring. Thus if the job is started from scratch, the 
> timer would fail to be registered and cause the rolling policy not work. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #258: [chore] Change the session cluster not found log level to error

2022-06-06 Thread GitBox


gyfora commented on code in PR #258:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/258#discussion_r890326994


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java:
##
@@ -54,7 +54,7 @@ public boolean sessionClusterReady(Optional 
flinkDeploymentOpt)
 return true;
 }
 } else {
-logger.info("Session cluster deployment is not found");
+logger.error("Session cluster deployment is not found");

Review Comment:
   I think INFO/WARN is more appropriate as this is completely expected to 
happen and it's not really an error. @Aitozi what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on a diff in pull request #19561: [FLINK-26414][hive] Hive dialect supports macro

2022-06-06 Thread GitBox


lsyldliu commented on code in PR #19561:
URL: https://github.com/apache/flink/pull/19561#discussion_r890126279


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/planner/delegation/hive/HiveASTParserTest.java:
##
@@ -139,6 +139,13 @@ public void testFunction() throws Exception {
 assertDDLType(HiveASTParser.TOK_SHOWFUNCTIONS, "show functions");
 }
 
+@Test
+public void testMacro() throws Exception {

Review Comment:
   It will be better if we add a test that create a mcro with qualified name



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode 
ast) {
 }
 }
 
+private Operation convertCreateMacro(HiveParserASTNode ast) throws 
SemanticException {
+String macroName = ast.getChild(0).getText();
+if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+throw new SemanticException("Temporary macro cannot be created 
with a qualified name.");

Review Comment:
   According to the method `FunctionUtils.isQualifiedFunctionName`, this error 
message may be not cleared for user? so I think we can improve it.



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##
@@ -36,23 +38,48 @@
 public static final long serialVersionUID = 393313529306818205L;
 
 private final String className;
+// a field to hold the string serialized for the UDF.
+// we sometimes need to hold it in case of some serializable UDF will 
contain
+// additional information such as Hive's GenericUDFMacro and if we 
construct the UDF directly by
+// getUDFClass#newInstance, the information will be missed.
+private String udfSerializedString;
 
 private transient UDFType instance = null;
 
 public HiveFunctionWrapper(String className) {
 this.className = className;
 }
 
+/**
+ * Create a HiveFunctionWrapper with a UDF instance. In this constructor, 
the instance will be
+ * serialized to string and held on in the HiveFunctionWrapper.
+ */
+public HiveFunctionWrapper(String className, UDFType serializableInstance) 
{
+this.className = className;

Review Comment:
   we can call it directly `this(className);` 



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode 
ast) {
 }
 }
 
+private Operation convertCreateMacro(HiveParserASTNode ast) throws 
SemanticException {
+String macroName = ast.getChild(0).getText();
+if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+throw new SemanticException("Temporary macro cannot be created 
with a qualified name.");
+}
+
+List arguments = getColumns((HiveParserASTNode) 
ast.getChild(1), true);

Review Comment:
   The `arguments ` readability is not good? IMO, it refers the all columns of 
the table? In other words, we should add an annotation about it.



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##
@@ -88,4 +115,20 @@ public String getClassName() {
 public Class getUDFClass() throws ClassNotFoundException {
 return (Class) 
Thread.currentThread().getContextClassLoader().loadClass(className);
 }
+
+/**
+ * Deserialize UDF used the udfSerializedString held on.
+ *
+ * @return the UDF deserialized
+ */
+private UDFType deserializeUDF() {
+try {
+return (UDFType)
+SerializationUtilities.deserializeObject(
+udfSerializedString, (Class) 
getUDFClass());
+} catch (ClassNotFoundException e) {
+throw new FlinkHiveUDFException(
+String.format("Failed to deserialize function %s", 
className), e);
+}

Review Comment:
   "Failed to deserialize function %s."



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##
@@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode 
ast) {
 }
 }
 
+private Operation convertCreateMacro(HiveParserASTNode ast) throws 
SemanticException {
+String macroName = ast.getChild(0).getText();
+if (FunctionUtils.isQualifiedFunctionName(macroName)) {
+throw new SemanticException("Temporary macro cannot be created 
with a qualified name.");
+}
+
+List arguments = getColumns((HiveParserASTNode) 
ast.getChild(1), true);
+Set actualColumnNames = 

[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #255: [FLINK-27257] Retry failed savepoints within grace period

2022-06-06 Thread GitBox


gyfora commented on PR #255:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/255#issuecomment-1147637009

   @Aitozi updated the PR according to your comments :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

2022-06-06 Thread GitBox


gyfora commented on code in PR #255:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890310614


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
 .map(Savepoint::getLocation)
 .orElse(null);
 
-observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-.ifPresent(
-err ->
-EventUtils.createOrUpdateEvent(
-flinkService.getKubernetesClient(),
-resource,
-EventUtils.Type.Warning,
-"SavepointError",
-SavepointUtils.createSavepointError(
-savepointInfo,
-resource.getSpec()
-.getJob()
-
.getSavepointTriggerNonce()),
-EventUtils.Component.Operator));
-
-// We only need to observe latest checkpoint/savepoint for terminal 
jobs
-if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+// If any manual or periodic savepoint is in progress, observe it
+if (SavepointUtils.savepointInProgress(jobStatus)) {
+observeTriggeredSavepoint(resource, jobId, deployedConfig);
 }
 
-var currentLastSpPath =
-Optional.ofNullable(savepointInfo.getLastSavepoint())
-.map(Savepoint::getLocation)
-.orElse(null);
-
-// If the last savepoint information changes we need to patch the 
status
-// to avoid losing this in case of an operator failure after the 
cluster was shut down
-if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-LOG.info(
-"Updating resource status after observing new last 
savepoint {}",
-currentLastSpPath);
-statusHelper.patchAndCacheStatus(resource);
+// If job is in globally terminal state, observe last savepoint
+if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
 }
+
+patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
 }
 
 /**
  * Observe the savepoint result based on the current savepoint info.
  *
- * @param currentSavepointInfo the current savepoint info.
+ * @param resource the resource being observed
  * @param jobID the jobID of the observed job.
  * @param deployedConfig Deployed job config.
  * @return The observed error, if no error observed, {@code 
Optional.empty()} will be returned.
  */
-private Optional observeTriggeredSavepointProgress(
-SavepointInfo currentSavepointInfo, String jobID, Configuration 
deployedConfig) {
-if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-LOG.debug("Savepoint not in progress");
-return Optional.empty();
-}
+private void observeTriggeredSavepoint(
+AbstractFlinkResource resource, String jobID, Configuration 
deployedConfig) {
+
+var savepointInfo = 
resource.getStatus().getJobStatus().getSavepointInfo();
+
 LOG.info("Observing savepoint status.");
-SavepointFetchResult savepointFetchResult =
+var savepointFetchResult =
 flinkService.fetchSavepointInfo(
-currentSavepointInfo.getTriggerId(), jobID, 
deployedConfig);
+savepointInfo.getTriggerId(), jobID, deployedConfig);
 
 if (savepointFetchResult.isPending()) {
-if (SavepointUtils.gracePeriodEnded(
-configManager.getOperatorConfiguration(), 
currentSavepointInfo)) {
-String errorMsg =
-"Savepoint operation timed out after "
-+ configManager
-.getOperatorConfiguration()
-.getSavepointTriggerGracePeriod();
-currentSavepointInfo.resetTrigger();
-LOG.error(errorMsg);
-return Optional.of(errorMsg);
-} else {
-LOG.info("Savepoint operation not finished yet, waiting within 
grace period...");
-return Optional.empty();
-}
+LOG.info("Savepoint operation not finished yet...");
+  

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

2022-06-06 Thread GitBox


gyfora commented on code in PR #255:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890307980


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions {
 "The interval for observing status for in-progress 
operations such as deployment and savepoints.");
 
 public static final ConfigOption 
OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =

Review Comment:
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >