[jira] [Commented] (FLINK-25420) Port JDBC Source to new Source API (FLIP-27)
[ https://issues.apache.org/jira/browse/FLINK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550811#comment-17550811 ] Jing Ge commented on FLINK-25420: - Thanks [~RocMarshal] for driving this! I think it makes sense to write a FLIP based on the template: [https://cwiki.apache.org/confluence/display/FLINK/FLIP+Connector+Template] and discuss it in the ML. > Port JDBC Source to new Source API (FLIP-27) > > > Key: FLINK-25420 > URL: https://issues.apache.org/jira/browse/FLINK-25420 > Project: Flink > Issue Type: Improvement >Reporter: Martijn Visser >Priority: Major > > The current JDBC connector is using the old SourceFunction interface, which > is going to be deprecated. We should port/refactor the JDBC Source to use the > new Source API, based on FLIP-27 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27928) External Resource Framework: 'external-resources' config delimiter is not working as expected
James Cho created FLINK-27928: - Summary: External Resource Framework: 'external-resources' config delimiter is not working as expected Key: FLINK-27928 URL: https://issues.apache.org/jira/browse/FLINK-27928 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.15.0 Reporter: James Cho For flink version 1.15.0, the configuration `external-resources` delimiter separation(";") functionality between external resource names is not working as expected. Reproducing the bug setting up a flink cluster running using docker with two external resources named `abc` and `efg`: docker-compose.yml: {code:java} services: jobmanager: image: flink:1.15.0-scala_2.12 ports: - "8081:8081" - "6123:6123" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.15.0-scala_2.12 depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 8 external-resources: abc;efg external-resource.abc.amount: 1 external-resource.efg.amount: 1{code} Related task manager logs: {code:java} root@2873c5ae72c2:/opt/flink# cat log/flink* | grep ExternalResourceUtils 2022-06-07 05:42:34,232 WARN org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - The amount of the abc;efg should be configured. Will ignore that resource. 2022-06-07 05:42:34,232 INFO org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: [] 2022-06-07 05:42:34,236 WARN org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Could not find driver class name for abc;efg. Please make sure external-resource.abc;efg.driver-factory.class is configured. 2022-06-07 05:42:34,240 WARN org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - The amount of the abc;efg should be configured. Will ignore that resource. {code} Notice flink recognizes `abc;efg` as a single external resource rather than as `abc` and `efg` The same issue does not exist in flink version 1.14.4 docker-compose.yml: {code:java} services: jobmanager: image: flink:1.14.4-scala_2.12 ports: - "8081:8081" - "6123:6123" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.12 depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 8 external-resources: abc;efg external-resource.abc.amount: 1 external-resource.efg.amount: 1 {code} related tm logs {code:java} docker logs ... | grep ExternalResourceUtils 2022-06-07 05:48:13,531 INFO org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: [abc, efg] 2022-06-07 05:48:13,534 WARN org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Could not find driver class name for abc. Please make sure external-resource.abc.driver-factory.class is configured. 2022-06-07 05:48:13,534 WARN org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Could not find driver class name for efg. Please make sure external-resource.efg.driver-factory.class is configured. {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
lindong28 commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890784808 ## flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java: ## @@ -254,58 +272,150 @@ public Tuple3 map(Tuple2 value DenseVector, DenseVector[], Tuple2>, IterationListener> { private final DistanceMeasure distanceMeasure; -private ListState points; -private ListState centroids; +private ListState centroidsState; +private DenseVector[] centroids; + +private Path basePath; +private OperatorID operatorID; +private MemorySegmentPool segmentPool; +private DataCacheWriter dataCacheWriter; public SelectNearestCentroidOperator(DistanceMeasure distanceMeasure) { +super(); this.distanceMeasure = distanceMeasure; } +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output>> output) { +super.setup(containingTask, config, output); + +operatorID = config.getOperatorID(); + +MemoryManager memoryManager = getContainingTask().getEnvironment().getMemoryManager(); Review Comment: Could you refactor the code so that algorithm developers won't need to handle so many details? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27927) Adjust table store connector common apis
Jingsong Lee created FLINK-27927: Summary: Adjust table store connector common apis Key: FLINK-27927 URL: https://issues.apache.org/jira/browse/FLINK-27927 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: table-store-0.2.0 We currently have the initial FileStoreTable-related interface, but something is missing to satisfy our four approaches: 1. Type conversion 2. Data structure conversion 3. Filter conversion 4. Scan and Read In this jira, more easy-to-use features will be added. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27927) Improve table store connector common apis
[ https://issues.apache.org/jira/browse/FLINK-27927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-27927: - Summary: Improve table store connector common apis (was: Adjust table store connector common apis) > Improve table store connector common apis > - > > Key: FLINK-27927 > URL: https://issues.apache.org/jira/browse/FLINK-27927 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: table-store-0.2.0 > > > We currently have the initial FileStoreTable-related interface, but something > is missing to satisfy our four approaches: > 1. Type conversion > 2. Data structure conversion > 3. Filter conversion > 4. Scan and Read > In this jira, more easy-to-use features will be added. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27927) Improve table store connector common interfaces
[ https://issues.apache.org/jira/browse/FLINK-27927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-27927: - Summary: Improve table store connector common interfaces (was: Improve table store connector common apis) > Improve table store connector common interfaces > --- > > Key: FLINK-27927 > URL: https://issues.apache.org/jira/browse/FLINK-27927 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: table-store-0.2.0 > > > We currently have the initial FileStoreTable-related interface, but something > is missing to satisfy our four approaches: > 1. Type conversion > 2. Data structure conversion > 3. Filter conversion > 4. Scan and Read > In this jira, more easy-to-use features will be added. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] Sxnan commented on pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
Sxnan commented on PR #19653: URL: https://github.com/apache/flink/pull/19653#issuecomment-1148202674 @zhuzhurk Thanks for your comments. Please take another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Sxnan commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
Sxnan commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r890770525 ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java: ## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException; +import org.apache.flink.types.IntValue; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** Integration tests for reusing persisted intermediate dataset */ +public class JobIntermediateDatasetReuseTest { + +private static final Logger LOG = +LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class); + +@Test +public void testClusterPartitionReuse() throws Exception { +internalTestClusterPartitionReuse(1); +} + +@Test +public void testClusterPartitionReuseMultipleParallelism() throws Exception { +internalTestClusterPartitionReuse(64); +} + +private void internalTestClusterPartitionReuse(int parallelism) throws Exception { +final TestingMiniClusterConfiguration miniClusterConfiguration = +TestingMiniClusterConfiguration.newBuilder().build(); + +try (TestingMiniCluster miniCluster = + TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) { +miniCluster.start(); + +IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); +final JobGraph firstJobGraph = createFirstJobGraph(parallelism, intermediateDataSetID); +miniCluster.submitJob(firstJobGraph).get(); +CompletableFuture jobResultFuture = +miniCluster.requestJobResult(firstJobGraph.getJobID()); +JobResult jobResult = jobResultFuture.get(); +assertTrue(jobResult.isSuccess()); + +final JobGraph secondJobGraph = +createSecondJobGraph(parallelism, intermediateDataSetID); +miniCluster.submitJob(secondJobGraph).get(); +jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID()); +jobResult = jobResultFuture.get(); +assertTrue(jobResult.isSuccess()); +} +} + +@Test +public void testClusterPartitionReuseWithMoreConsumerParallelismThrowException() +throws Exception { +final TestingMiniClusterConfiguration miniClusterConfiguration = +TestingMiniClusterConfiguration.newBuilder().build(); + +try (TestingMiniCluster miniCluster = + TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) { +miniCluster.start(); + +IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); +final JobGraph firstJobGraph = createFirstJobGraph(1, intermediateDataSetID); +miniCluster.submitJob(firstJobGraph).get();
[GitHub] [flink] Sxnan commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
Sxnan commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r890770323 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java: ## @@ -120,6 +129,31 @@ public Collection getAllTrackedPartitions() return partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList()); } +@Override +public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) { +this.resourceManagerGateway = resourceManagerGateway; +} + +@Override +public List getClusterPartitionShuffleDescriptors( +IntermediateDataSetID intermediateDataSetID) { +return clusterPartitionShuffleDescriptors.computeIfAbsent( +intermediateDataSetID, this::requestShuffleDescriptorsFromResourceManager); +} + +private List requestShuffleDescriptorsFromResourceManager( +IntermediateDataSetID intermediateDataSetID) { +Preconditions.checkNotNull( +resourceManagerGateway, "JobMaster is not connected to ResourceManager"); +try { +return this.resourceManagerGateway + .getClusterPartitionsShuffleDescriptors(intermediateDataSetID) +.get(); +} catch (InterruptedException | ExecutionException e) { +throw new RuntimeException(e); Review Comment: The message is added. And it catches all throwable. [62bb33d](https://github.com/apache/flink/pull/19653/commits/62bb33d10fb8ac51450ca96e336c3c312259e12a) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Sxnan commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results
Sxnan commented on code in PR #19653: URL: https://github.com/apache/flink/pull/19653#discussion_r890770010 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java: ## @@ -115,6 +122,29 @@ private FailureHandlingResult handleFailure( final Set verticesToRestart, final boolean globalFailure) { +if (!globalFailure) { Review Comment: You are right. It is much clear to do the translation in the DefaultScheduler. Change is made in [976d73b](https://github.com/apache/flink/pull/19653/commits/976d73b7f33d1d5ee8b788906b083419d6342748). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27497) Track terminal job states in the observer
[ https://issues.apache.org/jira/browse/FLINK-27497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550799#comment-17550799 ] Aitozi commented on FLINK-27497: continue this work now > Track terminal job states in the observer > - > > Key: FLINK-27497 > URL: https://issues.apache.org/jira/browse/FLINK-27497 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Gyula Fora >Assignee: Aitozi >Priority: Critical > > With the improvements in FLINK-27468 Flink 1.15 app clusters will not be shut > down in case of terminal job states (failed, finished) etc. > It is important to properly handle these states and let the user know about > it. > We should always trigger events, and for terminally failed jobs record the > error information in the FlinkDeployment status. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-25420) Port JDBC Source to new Source API (FLIP-27)
[ https://issues.apache.org/jira/browse/FLINK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550796#comment-17550796 ] RocMarshal commented on FLINK-25420: [~martijnvisser] Sorry for the late response. I have made the study about the JDBC connector source for the new connector API. It is obvious that the new source interface and the old source interface are no longer applicable to the distribution of source split. However, interfaces such as LookupSource and Dialects in Table Source can still be reused and improved based on the old. Let's start with API source first. h1. JdbcSourceSplit The preliminary definition of source split is as follows: {code:java} public class JdbcSourceSplit implements SourceSplit, Serializable { private final String id; private final String sqlTemplate; private final @Nullable Serializable[] parameters; // The default value is 0. The valid value range between [1, Integer.MAX_VALUE] private final int offset; // code placeholder... } {code} h1. Bounded or UnBounded for JdbcSourceEnumerator h2. UnBounded case: * Continuous JdbcSourceSplit ** If there is no JdbcSouceSplit at present, the JdbcSourceSplitEnumerator will not process the JdbcSouceSplit requestments. h2. Bounded case: * Static JdbcSourceSplit Set. ** If there is no JdbcSouceSplit at present, the JdbcSourceSplitEnumerator will notify no-more JdbcSouceSplit for JdbcSouceSplit requestments. * The items to be discussed: ** Whether we need to support such a special bounded scenario abstraction? *** The number of JdbcSourceSplit is certain, but the time to generate all JdbcSourceSplit completely is not certain in the user defined implementation. When the condition that the JdbcSourceSplit generate-process end is met, the JdbcSourceSplit will not be generated. After all JdbcSourceSplit processing is completed, the reader will be notified that there are no more JdbcSourceSplit from JdbcSourceSplitEnumerator. h1. Semantic guarantee for JdbcSourceSplitReader h2. Stream execution mode * JdbcSourceSplit-based for 'At least once' ** If any exception is encountered, just reprocess the JdbcSourceSplit for the default offset value 0. * ResultSet offset based for 'Exactly Once': ** If any exception is encountered, just reprocess the JdbcSourceSplit based on the offset value from JdbcSourceSplitState. * ** *** If the offset value is greater than the number of result set messages, skip the current JdbcSourceSplit. *** If offset is less than or equal to the number of result set messages, continue processing based on the offset position. ** *Disadvantages:* It only makes sense for Exactly Once that the ResultSet corresponding to this SQL(JdbcSourceSplit) remains unchanged in the whole lifecycle of JdbcSourceSplit processing. {_}Unfortunately{_}, this condition is not met in most databases and data scenarios. * JdbcSourceSplit-based for 'At most once' ** If the offset value of the current JdbcSourceSplit is not 0, it indicates that this is a processed JdbcSourceSplit for now,just skip this JdbcSourceSplit. In short, once we process the current JdbcSourceSplit with failure, it will be ignored and the processing of the next JdbcSourceSplit will begin. h2. batch execution mode If any exception is encountered, just reprocess the JdbcSourceSplit for the default offset value 0. Please let me know what's your opinion. Thank you~. > Port JDBC Source to new Source API (FLIP-27) > > > Key: FLINK-25420 > URL: https://issues.apache.org/jira/browse/FLINK-25420 > Project: Flink > Issue Type: Improvement >Reporter: Martijn Visser >Priority: Major > > The current JDBC connector is using the old SourceFunction interface, which > is going to be deprecated. We should port/refactor the JDBC Source to use the > new Source API, based on FLIP-27 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
lindong28 commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890757304 ## flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java: ## @@ -254,58 +272,150 @@ public Tuple3 map(Tuple2 value DenseVector, DenseVector[], Tuple2>, IterationListener> { private final DistanceMeasure distanceMeasure; -private ListState points; -private ListState centroids; +private ListState centroidsState; +private DenseVector[] centroids; Review Comment: Does this improve performance by using `centroids`? If not, it seems simpler not to add this variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
lindong28 commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890739744 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java: ## @@ -59,87 +80,100 @@ public DataCacheWriter( SupplierWithException pathGenerator, List priorFinishedSegments) throws IOException { -this.serializer = serializer; +this(serializer, fileSystem, pathGenerator, null, priorFinishedSegments); +} + +public DataCacheWriter( +TypeSerializer serializer, +FileSystem fileSystem, +SupplierWithException pathGenerator, +@Nullable MemorySegmentPool segmentPool, +List priorFinishedSegments) +throws IOException { this.fileSystem = fileSystem; this.pathGenerator = pathGenerator; - -this.finishSegments = new ArrayList<>(priorFinishedSegments); - -this.currentSegment = new SegmentWriter(pathGenerator.get()); +this.segmentPool = segmentPool; +this.serializer = serializer; +this.finishedSegments = new ArrayList<>(priorFinishedSegments); +this.currentSegmentWriter = createSegmentWriter(); } public void addRecord(T record) throws IOException { -currentSegment.addRecord(record); -} - -public void finishCurrentSegment() throws IOException { -finishCurrentSegment(true); +if (!currentSegmentWriter.addRecord(record)) { +currentSegmentWriter.finish().ifPresent(finishedSegments::add); +currentSegmentWriter = new FileSegmentWriter<>(serializer, pathGenerator.get()); +Preconditions.checkState(currentSegmentWriter.addRecord(record)); +} } +/** Finishes adding records and closes resources occupied for adding records. */ public List finish() throws IOException { -finishCurrentSegment(false); -return finishSegments; -} +if (currentSegmentWriter == null) { +return finishedSegments; +} -public FileSystem getFileSystem() { -return fileSystem; +currentSegmentWriter.finish().ifPresent(finishedSegments::add); +currentSegmentWriter = null; +return finishedSegments; } -public List getFinishSegments() { -return finishSegments; +/** + * Flushes all added records to segments and returns a list of segments containing all cached + * records. + */ +public List getSegments() throws IOException { +finishCurrentSegmentIfExists(); +return finishedSegments; } -private void finishCurrentSegment(boolean newSegment) throws IOException { -if (currentSegment != null) { -currentSegment.finish().ifPresent(finishSegments::add); -currentSegment = null; +private void finishCurrentSegmentIfExists() throws IOException { +if (currentSegmentWriter == null) { +return; } -if (newSegment) { -currentSegment = new SegmentWriter(pathGenerator.get()); -} +currentSegmentWriter.finish().ifPresent(finishedSegments::add); +currentSegmentWriter = createSegmentWriter(); } -private class SegmentWriter { - -private final Path path; - -private final FSDataOutputStream outputStream; - -private final DataOutputView outputView; - -private int currentSegmentCount; - -public SegmentWriter(Path path) throws IOException { -this.path = path; -this.outputStream = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE); -this.outputView = new DataOutputViewStreamWrapper(outputStream); +/** Cleans up all previously added records. */ +public void cleanup() throws IOException { Review Comment: Would it be more consistent with `State::clear()` to rename this method as `clear()`? ## flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java: ## @@ -254,58 +272,150 @@ public Tuple3 map(Tuple2 value DenseVector, DenseVector[], Tuple2>, IterationListener> { private final DistanceMeasure distanceMeasure; -private ListState points; -private ListState centroids; +private ListState centroidsState; +private DenseVector[] centroids; Review Comment: Does this improve performance by using `centroids`? If not, it seems simpler to not adding this variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27926) Translate ```upgrading.md``` into chinese
[ https://issues.apache.org/jira/browse/FLINK-27926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27926: --- Labels: pull-request-available (was: ) > Translate ```upgrading.md``` into chinese > - > > Key: FLINK-27926 > URL: https://issues.apache.org/jira/browse/FLINK-27926 > Project: Flink > Issue Type: Improvement > Components: chinese-translation >Reporter: Zixuan Rao >Priority: Major > Labels: pull-request-available > Original Estimate: 1h > Remaining Estimate: 1h > > A pull request is submitted as this issue is created. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19890: Translate `upgrading.md` into chinese FLINK-27926
flinkbot commented on PR #19890: URL: https://github.com/apache/flink/pull/19890#issuecomment-1148175441 ## CI report: * 59948edef8632d168533185876bc44815aea4a3f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27926) Translate ```upgrading.md``` into chinese
[ https://issues.apache.org/jira/browse/FLINK-27926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550790#comment-17550790 ] Zixuan Rao commented on FLINK-27926: A pull request #19890 is submitted. [~dianfu] Could you help to merge it? Thanks > Translate ```upgrading.md``` into chinese > - > > Key: FLINK-27926 > URL: https://issues.apache.org/jira/browse/FLINK-27926 > Project: Flink > Issue Type: Improvement > Components: chinese-translation >Reporter: Zixuan Rao >Priority: Major > Original Estimate: 1h > Remaining Estimate: 1h > > A pull request is submitted as this issue is created. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] billyrrr opened a new pull request, #19890: Translate `upgrading.md` into chinese #FLINK-17285
billyrrr opened a new pull request, #19890: URL: https://github.com/apache/flink/pull/19890 ## What is the purpose of the change Translate `upgrading.md` into chinese #FLINK-17285. The documentation is about upgrading existing flink cluster. ## Brief change log - *Duplicate upgrading.md from current master to content.zh* - *Translate the documentation with automatic translation* - *Manually inspect translation and calibrate with translation table* ## Verifying this change This change added tests and can be verified as follows: - Rebuild doc and visually inspect the new page ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19889: [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.
flinkbot commented on PR #19889: URL: https://github.com/apache/flink/pull/19889#issuecomment-1148172903 ## CI report: * a92e364e9bc5981dbe493762de04fb3a57b4dfb4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder
PatrickRen commented on PR #19366: URL: https://github.com/apache/flink/pull/19366#issuecomment-1148172752 I think this PR is ready to merge once we resolve minor pending comments above. @mas-chen would you like to make some updates and rebase the latest master? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27926) Translate ```upgrading.md``` into chinese
Zixuan Rao created FLINK-27926: -- Summary: Translate ```upgrading.md``` into chinese Key: FLINK-27926 URL: https://issues.apache.org/jira/browse/FLINK-27926 Project: Flink Issue Type: Improvement Components: chinese-translation Reporter: Zixuan Rao A pull request is submitted as this issue is created. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-27872) Allow KafkaBuilder to set arbitrary subscribers
[ https://issues.apache.org/jira/browse/FLINK-27872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren closed FLINK-27872. - Resolution: Duplicate > Allow KafkaBuilder to set arbitrary subscribers > --- > > Key: FLINK-27872 > URL: https://issues.apache.org/jira/browse/FLINK-27872 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Reporter: Salva >Priority: Major > > Currently, > [KafkaSourceBuilder|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java] > has two setters: > * > [setTopics|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L157] > * > [setTopicPattern|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L168] > which under the hood instantiate the corresponding (concrete) subscribers. > This covers the most common needs, I agree, but it might fall short in some > cases. Why not add a more generic setter: > * {{setSubscriber (???)}} > Otherwise, how can users read from kafka in combination with custom > subscribing logic? Without looking much into it, it seems that they basically > cannot, at least without having to replicate some parts of the connector, > which seems rather inconvenient. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch
[ https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27910: --- Labels: pull-request-available (was: ) > FileSink not registered the timer to enforce rolling policy if started from > scratch > --- > > Key: FLINK-27910 > URL: https://issues.apache.org/jira/browse/FLINK-27910 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yun Gao >Priority: Critical > Labels: pull-request-available > > The current FileWriter only register the timer in initializeState, which is > now only called on restoring. Thus if the job is started from scratch, the > timer would fail to be registered and cause the rolling policy not work. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27872) Allow KafkaBuilder to set arbitrary subscribers
[ https://issues.apache.org/jira/browse/FLINK-27872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550788#comment-17550788 ] Qingsheng Ren commented on FLINK-27872: --- I think this duplicates FLINK-24660. > Allow KafkaBuilder to set arbitrary subscribers > --- > > Key: FLINK-27872 > URL: https://issues.apache.org/jira/browse/FLINK-27872 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Reporter: Salva >Priority: Major > > Currently, > [KafkaSourceBuilder|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java] > has two setters: > * > [setTopics|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L157] > * > [setTopicPattern|https://github.com/apache/flink/blob/586715f23ef49939ab74e4736c58d71c643a64ba/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L168] > which under the hood instantiate the corresponding (concrete) subscribers. > This covers the most common needs, I agree, but it might fall short in some > cases. Why not add a more generic setter: > * {{setSubscriber (???)}} > Otherwise, how can users read from kafka in combination with custom > subscribing logic? Without looking much into it, it seems that they basically > cannot, at least without having to replicate some parts of the connector, > which seems rather inconvenient. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] pltbkd opened a new pull request, #19889: [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.
pltbkd opened a new pull request, #19889: URL: https://github.com/apache/flink/pull/19889 ## What is the purpose of the change This pull request fixes the bug that FileSink does not register the timer to enforce rolling policy if starts from scratch ## Brief change log - FileWriter#initializeState is called with an empty state collection when creating writer for FileSink when starts from scratch. ## Verifying this change This change added a new unit test verifying this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #19482: [FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables
lsyldliu commented on code in PR #19482: URL: https://github.com/apache/flink/pull/19482#discussion_r890744876 ## docs/content.zh/docs/connectors/table/hive/hive_read_write.md: ## @@ -170,6 +170,13 @@ following parameters in `TableConfig` (note that these parameters affect all sou Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0. +### + +In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table. +Then when read the external table, there will be sub-directories in the partition directory of the external table. +You can use `table.exec.hive.read-partition-with-subdirectory.enabled` to configure Flink to read the sub-directories or skip them directly. Review Comment: You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink read ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on a diff in pull request #19827: [FLINK-27806][table] Support binary & varbinary types in datagen connector
PatrickRen commented on code in PR #19827: URL: https://github.com/apache/flink/pull/19827#discussion_r890743854 ## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ## @@ -176,6 +195,7 @@ void testSource() throws Exception { assertThat(row.getLong(1)).isBetween(10L, 100L); assertThat(row.getLong(2)).isEqualTo(i + 50); assertThat(row.getTimestamp(3, 3).getMillisecond()).isBetween(begin - 5000, end); +assertThat(row.getBinary(4).length).isEqualTo(2); Review Comment: Could we also have an assertion here for field `f5`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] deadwind4 commented on a diff in pull request #19732: [FLINK-18887][python][connector/elasticsearch] Add Elasticsearch DataStream API
deadwind4 commented on code in PR #19732: URL: https://github.com/apache/flink/pull/19732#discussion_r890739338 ## flink-python/pyflink/datastream/connectors/elasticsearch.py: ## @@ -0,0 +1,255 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from enum import Enum +from typing import List + +from pyflink.datastream.connectors import Sink, DeliveryGuarantee +from pyflink.java_gateway import get_gateway +from pyflink.util.java_utils import to_jarray, load_java_class + + +class FlushBackoffType(Enum): +""" +Used to control whether the sink should retry failed requests at all or with which kind back off +strategy. + +:data: `CONSTANT`: + +After every failure, it waits a configured time until the retries are exhausted. + +:data: `EXPONENTIAL`: + +After every failure, it waits initially the configured time and increases the waiting time +exponentially until the retries are exhausted. + +:data: `NONE`: + +The failure is not retried. +""" + +CONSTANT = 0, +EXPONENTIAL = 1, +NONE = 2, + +def _to_j_flush_backoff_type(self): +JFlushBackoffType = get_gateway().jvm \ +.org.apache.flink.connector.elasticsearch.sink.FlushBackoffType +return getattr(JFlushBackoffType, self.name) + + +class ElasticsearchSinkBuilderBase(abc.ABC): +""" +Base builder to construct a ElasticsearchSink. +""" + +@abc.abstractmethod +def __init__(self): +self._j_elasticsearch_sink_builder = None + +def set_emitter(self, emitter_class_name: str): Review Comment: I implement a `PythonSimpleElasticsearchEmiiter`, I have tested it insert and update is successful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-25705) Translate "Metric Reporters" page of "Deployment" in to Chinese
[ https://issues.apache.org/jira/browse/FLINK-25705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao closed FLINK-25705. --- Fix Version/s: 1.16.0 Resolution: Fixed > Translate "Metric Reporters" page of "Deployment" in to Chinese > --- > > Key: FLINK-25705 > URL: https://issues.apache.org/jira/browse/FLINK-25705 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Chengkai Yang >Assignee: Chengkai Yang >Priority: Minor > Labels: auto-unassigned, pull-request-available > Fix For: 1.16.0 > > > The page url is > [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters] > The markdown file is located in > flink/docs/content.zh/docs/deployment/metric_reporters.md > This issue should be merged after Flink-25830 and > [FLINK-26222|https://issues.apache.org/jira/browse/FLINK-26222]is merged or > solved. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-25705) Translate "Metric Reporters" page of "Deployment" in to Chinese
[ https://issues.apache.org/jira/browse/FLINK-25705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550785#comment-17550785 ] Yun Gao commented on FLINK-25705: - Merged on master via 5176669bc0ee678f21934f239d2b34a59102b1b8 > Translate "Metric Reporters" page of "Deployment" in to Chinese > --- > > Key: FLINK-25705 > URL: https://issues.apache.org/jira/browse/FLINK-25705 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Chengkai Yang >Assignee: Chengkai Yang >Priority: Minor > Labels: auto-unassigned, pull-request-available > > The page url is > [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters] > The markdown file is located in > flink/docs/content.zh/docs/deployment/metric_reporters.md > This issue should be merged after Flink-25830 and > [FLINK-26222|https://issues.apache.org/jira/browse/FLINK-26222]is merged or > solved. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] deadwind4 commented on a diff in pull request #19732: [FLINK-18887][python][connector/elasticsearch] Add Elasticsearch DataStream API
deadwind4 commented on code in PR #19732: URL: https://github.com/apache/flink/pull/19732#discussion_r890738562 ## flink-python/pyflink/datastream/connectors/__init__.py: ## @@ -48,6 +50,10 @@ 'JdbcExecutionOptions', 'NumberSequenceSource', 'OutputFileConfig', +'ElasticsearchSink', +'Elasticsearch6SinkBuilder', +'Elasticsearch7SinkBuilder', +'FlushBackoffType', Review Comment: > All the classes added in the future are placed in connector specific files This makes sense to me. All classes in each connector keep unifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ChengkaiYang2022 opened a new pull request, #19496: [FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment"
ChengkaiYang2022 opened a new pull request, #19496: URL: https://github.com/apache/flink/pull/19496 ## What is the purpose of the change Translate the "Metric Reporters" page of "Deployment".This pr is based on previous work of https://github.com/apache/flink/pull/19023. ## Brief change log *(for example:)* - Translate some tables in this page . ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii closed pull request #19496: [FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment"
gaoyunhaii closed pull request #19496: [FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment" URL: https://github.com/apache/flink/pull/19496 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27710) Improve logs to better display Execution
[ https://issues.apache.org/jira/browse/FLINK-27710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-27710: Description: Currently, an execution is usually represented as "{{{}job vertex name{}}} ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in logs, which may be redundant after this refactoring work. With the change of FLINK-17295, the representation of Execution in logs will be redundant. e.g. the subtask index is displayed 2 times. Therefore, I'm proposing to change the format to be "<{{{}job vertex name> {}}}(<{{{}subtaskIndex>+1{}}}/<{{{}vertex parallelism>{}}}) {{#, vertexId: <{}}}{{{}JobVertexID>{}}}) " and avoid directly display the {{{}ExecutionAttemptID{}}}. This can increase the log readability. Besides that, the displayed {{JobVertexID}} can also help to distinguish job vertices of the same name, which is common in DataStream jobs (e.g. multiple {{{}Map{}}}). was: Currently, an execution is usually represented as "{{{}job vertex name{}}} ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in logs, which may be redundant after this refactoring work. With the change of FLINK-17295, the representation of Execution in logs will be redundant. e.g. the subtask index is displayed 2 times. Therefore, I'm proposing to change the format to be "{{{}job vertex name{}}} ({{{}short ExecutionGraphID{}}}:{{{}JobVertexID{}}}) ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}#attemptNumber{}}})" and avoid directly display the {{{}ExecutionAttemptID{}}}. This can increase the log readability. Besides that, the displayed {{JobVertexID}} can also help to distinguish job vertices of the same name, which is common in DataStream jobs (e.g. multiple {{{}Map{}}}). > Improve logs to better display Execution > > > Key: FLINK-27710 > URL: https://issues.apache.org/jira/browse/FLINK-27710 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Task >Affects Versions: 1.16.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Fix For: 1.16.0 > > > Currently, an execution is usually represented as "{{{}job vertex name{}}} > ({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in > logs, which may be redundant after this refactoring work. With the change of > FLINK-17295, the representation of Execution in logs will be redundant. e.g. > the subtask index is displayed 2 times. > Therefore, I'm proposing to change the format to be "<{{{}job vertex name> > {}}}(<{{{}subtaskIndex>+1{}}}/<{{{}vertex parallelism>{}}}) > {{#, vertexId: > <{}}}{{{}JobVertexID>{}}}) " and avoid directly display the > {{{}ExecutionAttemptID{}}}. This can increase the log readability. > Besides that, the displayed {{JobVertexID}} can also help to distinguish job > vertices of the same name, which is common in DataStream jobs (e.g. multiple > {{{}Map{}}}). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] paul8263 commented on a diff in pull request #19772: [FLINK-27579][client] The param client.timeout can not be set by dyna…
paul8263 commented on code in PR #19772: URL: https://github.com/apache/flink/pull/19772#discussion_r890730623 ## flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java: ## @@ -182,4 +196,45 @@ protected void executeProgram(Configuration configuration, PackagedProgram progr program.getUserCodeClassLoader().getClass().getName()); } } + +public static void verifyCliFrontendWithDynamicProperties( +Configuration configuration, +String[] parameters, +GenericCLI cliUnderTest, +Duration expectedClientTimeout, +int expectedDefaultParallelism) +throws Exception { +TestingCliFrontendWithDynamicProperties testFrontend = +new TestingCliFrontendWithDynamicProperties( +configuration, +cliUnderTest, +expectedClientTimeout, +expectedDefaultParallelism); +testFrontend.run(parameters); // verifies the expected values (see below) +} + +private static final class TestingCliFrontendWithDynamicProperties extends CliFrontend { Review Comment: @wangyang0918 , Thank you very much for reviewing. I will update the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27791) SlotCountExceedingParallelismTest tests failed with NoResourceAvailableException
[ https://issues.apache.org/jira/browse/FLINK-27791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550769#comment-17550769 ] Zhu Zhu commented on FLINK-27791: - [~wangyang0918] do you have any idea of this problem? Flink is trying to get the login user via \{{org.apache.hadoop.security.UserGroupInformation.getCurrentUser}} but it failed due to \{{IllegalArgumentException}} . > SlotCountExceedingParallelismTest tests failed with > NoResourceAvailableException > > > Key: FLINK-27791 > URL: https://issues.apache.org/jira/browse/FLINK-27791 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Zhu Zhu >Priority: Major > Labels: test-stability > Attachments: error.log > > > {code:java} > 2022-05-25T12:16:09.2562348Z May 25 12:16:09 > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2022-05-25T12:16:09.2563741Z May 25 12:16:09 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2022-05-25T12:16:09.2565457Z May 25 12:16:09 at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:982) > 2022-05-25T12:16:09.2567245Z May 25 12:16:09 at > org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.submitJobGraphAndWait(SlotCountExceedingParallelismTest.java:101) > 2022-05-25T12:16:09.2569329Z May 25 12:16:09 at > org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.testNoSlotSharingAndBlockingResultBoth(SlotCountExceedingParallelismTest.java:94) > 2022-05-25T12:16:09.2571889Z May 25 12:16:09 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-05-25T12:16:09.2573109Z May 25 12:16:09 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-05-25T12:16:09.2574528Z May 25 12:16:09 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-05-25T12:16:09.2575657Z May 25 12:16:09 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-05-25T12:16:09.2581380Z May 25 12:16:09 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-05-25T12:16:09.2582747Z May 25 12:16:09 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-05-25T12:16:09.2583600Z May 25 12:16:09 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-05-25T12:16:09.2584455Z May 25 12:16:09 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-05-25T12:16:09.2585172Z May 25 12:16:09 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-05-25T12:16:09.2585792Z May 25 12:16:09 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-05-25T12:16:09.2586376Z May 25 12:16:09 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-05-25T12:16:09.2587035Z May 25 12:16:09 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-05-25T12:16:09.2587682Z May 25 12:16:09 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-05-25T12:16:09.2588589Z May 25 12:16:09 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-05-25T12:16:09.2589623Z May 25 12:16:09 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-05-25T12:16:09.2590262Z May 25 12:16:09 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-05-25T12:16:09.2590856Z May 25 12:16:09 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-05-25T12:16:09.2591453Z May 25 12:16:09 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-05-25T12:16:09.2592063Z May 25 12:16:09 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-05-25T12:16:09.2592673Z May 25 12:16:09 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-05-25T12:16:09.2593288Z May 25 12:16:09 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-05-25T12:16:09.2595864Z May 25 12:16:09 at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2022-05-25T12:16:09.2596521Z May 25 12:16:09 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-05-25T12:16:09.2597144Z May 25 12:16:09 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-05-25T12:16:09.2597703Z May 25 12:16:09 at > org.junit.runner.JUnitCore.run(JUnitCore.java:137) > 2022-05-25T12:16:09.2598247Z May 25 12:16:09 at >
[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
Aitozi commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890728118 ## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java: ## @@ -1424,6 +1428,37 @@ public void testReclaimInactiveSlotsOnClearRequirements() throws Exception { } } +@Test +public void testProcessResourceRequirementsWithDelay() throws Exception { Review Comment: Yes, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs commented on pull request #19881: [FLINK-27899][quickstarts] fix deactivate the shade plugin doesn't ta…
liuyongvs commented on PR #19881: URL: https://github.com/apache/flink/pull/19881#issuecomment-1148144676 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
KarmaGYZ commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890726474 ## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java: ## @@ -1424,6 +1428,37 @@ public void testReclaimInactiveSlotsOnClearRequirements() throws Exception { } } +@Test +public void testProcessResourceRequirementsWithDelay() throws Exception { Review Comment: I think we can process another resource requirement before manually triggering the scheduled task. Then the number of the pending task should be still equal to 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
Aitozi commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890724533 ## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java: ## @@ -1424,6 +1428,37 @@ public void testReclaimInactiveSlotsOnClearRequirements() throws Exception { } } +@Test +public void testProcessResourceRequirementsWithDelay() throws Exception { Review Comment: Verified by new added ``` Assertions.assertEquals(1, allocatedResourceCounter.get()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
Aitozi commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890723542 ## flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java: ## @@ -142,6 +142,13 @@ public class ResourceManagerOptions { + START_WORKER_MAX_FAILURE_RATE.key() + "') is reached."); +@Documentation.ExcludeFromDocumentation Review Comment: thanks for your hint -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
KarmaGYZ commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890722048 ## flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java: ## @@ -142,6 +142,13 @@ public class ResourceManagerOptions { + START_WORKER_MAX_FAILURE_RATE.key() + "') is reached."); +@Documentation.ExcludeFromDocumentation Review Comment: ```suggestion @Documentation.ExcludeFromDocumentation( "This is an expert option, that we do not want to expose in the documentation") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Vancior commented on a diff in pull request #19883: [FLINK-27901][python] support TableEnvironment.create(configuration)
Vancior commented on code in PR #19883: URL: https://github.com/apache/flink/pull/19883#discussion_r890721959 ## flink-python/pyflink/table/table_environment.py: ## @@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()): self._open() @staticmethod -def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment': +def create(conf_or_settings: Union[EnvironmentSettings, Configuration]) -> 'TableEnvironment': """ Creates a table environment that is the entry point and central context for creating Table and SQL API programs. -:param environment_settings: The environment settings used to instantiate the +:param conf_or_settings: The configuration or environment settings used to instantiate the :class:`~pyflink.table.TableEnvironment`. :return: The :class:`~pyflink.table.TableEnvironment`. """ gateway = get_gateway() -j_tenv = gateway.jvm.TableEnvironment.create( -environment_settings._j_environment_settings) +if isinstance(conf_or_settings, EnvironmentSettings): +environment_settings = conf_or_settings +elif isinstance(conf_or_settings, Configuration): +environment_settings = EnvironmentSettings.new_instance() \ Review Comment: `from_configuration` is deprecated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
Aitozi commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890718047 ## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java: ## @@ -1347,7 +1351,7 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throw .setSlotTracker(slotTracker) .buildAndStart( ResourceManagerId.generate(), - ComponentMainThreadExecutorServiceAdapter.forMainThread(), +Executors.directExecutor(), Review Comment: It's an unused change now, reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
Aitozi commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890717053 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java: ## @@ -407,13 +420,39 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) { LOG.debug("Freeing slot {}.", slotId); slotTracker.notifyFree(slotId); -checkResourceRequirements(); +checkResourceRequirementsWithDelay(); } // - // Requirement matching // - +/** + * Depending on the implementation of {@link ResourceAllocationStrategy}, checking resource + * requirements and potentially making a re-allocation can be heavy. In order to cover more + * changes with each check, thus reduce the frequency of unnecessary re-allocations, the checks + * are performed with a slight delay. + */ +private void checkResourceRequirementsWithDelay() { +if (requirementsCheckDelay.toMillis() <= 0) { Review Comment: Yes, will fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
Aitozi commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890716617 ## flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java: ## @@ -142,6 +142,13 @@ public class ResourceManagerOptions { + START_WORKER_MAX_FAILURE_RATE.key() + "') is reached."); +@Documentation.ExcludeFromDocumentation Review Comment: It is inspired by the comments in the `ResourceManagerRuntimeServices` to make this configurable. But I think it do not have the requirements to change the default value now. But I can remove the annotation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19886: [FLINK-27921] Introduce the checkResourceRequirementsWithDelay in Dec…
KarmaGYZ commented on code in PR #19886: URL: https://github.com/apache/flink/pull/19886#discussion_r890703027 ## flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java: ## @@ -142,6 +142,13 @@ public class ResourceManagerOptions { + START_WORKER_MAX_FAILURE_RATE.key() + "') is reached."); +@Documentation.ExcludeFromDocumentation Review Comment: Why do you introduce this config option? Do you mean to allow users to configure it? If so, why do you exclude it from the documentation? ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java: ## @@ -407,13 +420,39 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) { LOG.debug("Freeing slot {}.", slotId); slotTracker.notifyFree(slotId); -checkResourceRequirements(); +checkResourceRequirementsWithDelay(); } // - // Requirement matching // - +/** + * Depending on the implementation of {@link ResourceAllocationStrategy}, checking resource + * requirements and potentially making a re-allocation can be heavy. In order to cover more + * changes with each check, thus reduce the frequency of unnecessary re-allocations, the checks + * are performed with a slight delay. + */ +private void checkResourceRequirementsWithDelay() { +if (requirementsCheckDelay.toMillis() <= 0) { Review Comment: If we allow users to configure the delay period, we should also check it in `FineGrainedSlotManager` ## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java: ## @@ -1347,7 +1351,7 @@ public void testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throw .setSlotTracker(slotTracker) .buildAndStart( ResourceManagerId.generate(), - ComponentMainThreadExecutorServiceAdapter.forMainThread(), +Executors.directExecutor(), Review Comment: Could you help me to understand why we need this change? ## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java: ## @@ -1424,6 +1428,37 @@ public void testReclaimInactiveSlotsOnClearRequirements() throws Exception { } } +@Test +public void testProcessResourceRequirementsWithDelay() throws Exception { Review Comment: We'd better also test that the check method will be triggered only once in one delay period. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19888: [FLINK-27920][doc] Documented enums constant support ExcludeFromDocumentation annotation
flinkbot commented on PR #19888: URL: https://github.com/apache/flink/pull/19888#issuecomment-1148128870 ## CI report: * 7d6308a037148b6340eab31bc37c963fc0465a65 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27920) Documented enums constant support ExcludeFromDocumentation annotation
[ https://issues.apache.org/jira/browse/FLINK-27920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27920: --- Labels: pull-request-available (was: ) > Documented enums constant support ExcludeFromDocumentation annotation > - > > Key: FLINK-27920 > URL: https://issues.apache.org/jira/browse/FLINK-27920 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Weijie Guo >Priority: Major > Labels: pull-request-available > > if a config option has @ExcludeFromDocumentation annotation, it will not > appear in the document. But for an enumeration type, sometimes we only want > some of it's constant values not to appear in the document, this ticket > solves this problem. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] reswqa opened a new pull request, #19888: [FLINK-27920][doc] Documented enums constant support ExcludeFromDocumentation annotation
reswqa opened a new pull request, #19888: URL: https://github.com/apache/flink/pull/19888 ## What is the purpose of the change if a config option has @ExcludeFromDocumentation annotation, it will not appear in the document. But for an enumeration type, sometimes we only want some of its constant values not to appear in the document, this pull request fulfilled this requirement. ## Brief change log - *let configOptionsDocGenerator take care of ExcludeFromDocumentation annotation when getEnumOptionsDescription* ## Verifying this change This change added tests and can be verified by org.apache.flink.docs.configuration.ConfigOptionsDocGeneratorTest#TestConfigGroupWithEnumConstantExclusion ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19887: [FLINK-27924][python][docs] Add pulsar comments in the datastream package
flinkbot commented on PR #19887: URL: https://github.com/apache/flink/pull/19887#issuecomment-1148121181 ## CI report: * b8d1a745298de2069566aa871db98424b57dc78e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27920) Documented enums constant support ExcludeFromDocumentation annotation
[ https://issues.apache.org/jira/browse/FLINK-27920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-27920: --- Issue Type: Improvement (was: Bug) > Documented enums constant support ExcludeFromDocumentation annotation > - > > Key: FLINK-27920 > URL: https://issues.apache.org/jira/browse/FLINK-27920 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Weijie Guo >Priority: Major > > if a config option has @ExcludeFromDocumentation annotation, it will not > appear in the document. But for an enumeration type, sometimes we only want > some of it's constant values not to appear in the document, this ticket > solves this problem. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27828) FlinkKafkaProducer VS KafkaSink
[ https://issues.apache.org/jira/browse/FLINK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550752#comment-17550752 ] Qingsheng Ren commented on FLINK-27828: --- [~Jiangfei Liu] Could you share some context of your benchmark, like the topology of the job, parallelism, checkpoint configuration and so forth? > FlinkKafkaProducer VS KafkaSink > --- > > Key: FLINK-27828 > URL: https://issues.apache.org/jira/browse/FLINK-27828 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.14.3 >Reporter: Jiangfei Liu >Priority: Major > Attachments: Snipaste_2022-05-25_19-52-11.png > > > sorry,my english is bad. > in flink1.14.3,write 1 data to kafka. > when use FlinkKafkaProducer,completed 7s > when use KafkaSink,completed 1m40s > why KafkaSink is low speed? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
yunfengzhou-hub commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890707072 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java: ## @@ -18,38 +18,73 @@ package org.apache.flink.iteration.datacache.nonkeyed; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.MemorySegment; -import java.io.Serializable; +import java.util.List; import java.util.Objects; -/** A segment represents a single file for the cache. */ -public class Segment implements Serializable { +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +/** A segment contains the information about a cache unit. */ +@Internal +public class Segment { + +/** The path to the file containing persisted records. */ private final Path path; -/** The count of the records in the file. */ +/** + * The count of records in the file at the path if the file size is not zero, otherwise the + * count of records in the cache. + */ private final int count; -/** The total length of file. */ -private final long size; +/** The total length of file containing persisted records. */ +private long fsSize = -1L; + +/** The memory segments containing cached records. */ Review Comment: Now the actual behavior is that the cache list is null rather than empty when not cached. I think it better satisfies Java's default values. How do you think we should treat null and empty list accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27921) Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager
[ https://issues.apache.org/jira/browse/FLINK-27921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-27921: --- Description: As discussed in [https://github.com/apache/flink/pull/19840#discussion_r884242067] .This ticket is meant to introduce the same mechanism to wait for a slight delay before process the resource check with {{{}FineGrainedSlotManager{}}}. It will reduce the frequency of unnecessary re-allocations was: As discussed in https://github.com/apache/flink/pull/19840#discussion_r884242067 .This ticket is meant to introduce the same mechanism to wait for a slight delay before process the resource check with {{FineGrainedSlotManager}}. It will the frequency of unnecessary re-allocations > Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager > -- > > Key: FLINK-27921 > URL: https://issues.apache.org/jira/browse/FLINK-27921 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > As discussed in > [https://github.com/apache/flink/pull/19840#discussion_r884242067] .This > ticket is meant to introduce the same mechanism to wait for a slight delay > before process the resource check with {{{}FineGrainedSlotManager{}}}. It > will reduce the frequency of unnecessary re-allocations -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27924) Add pulsar comments in the datastream package
[ https://issues.apache.org/jira/browse/FLINK-27924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27924: --- Labels: pull-request-available (was: ) > Add pulsar comments in the datastream package > - > > Key: FLINK-27924 > URL: https://issues.apache.org/jira/browse/FLINK-27924 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Affects Versions: 1.15.0 >Reporter: LuNng Wang >Priority: Major > Labels: pull-request-available > > https://github.com/apache/flink/pull/19732#discussion_r883356468 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] deadwind4 opened a new pull request, #19887: [FLINK-27924][python][docs] Add pulsar comments in the datastream package
deadwind4 opened a new pull request, #19887: URL: https://github.com/apache/flink/pull/19887 ## What is the purpose of the change Add pulsar comments in the datastream package ## Brief change log - *Add pulsar comments in __init__.py of the datastream package* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no - If yes, how is the feature documented? (PyDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27925) Avoid to create watcher without the resourceVersion
[ https://issues.apache.org/jira/browse/FLINK-27925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550749#comment-17550749 ] Aitozi commented on FLINK-27925: cc [~wangyang0918] > Avoid to create watcher without the resourceVersion > --- > > Key: FLINK-27925 > URL: https://issues.apache.org/jira/browse/FLINK-27925 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Reporter: Aitozi >Priority: Major > > Currently, we create the watcher in KubernetesResourceManager. But it do not > pass the resourceVersion parameter, it will trigger a request to etcd. It > will bring the burden to the etcd in large scale cluster (which have been > seen in our internal k8s cluster). More detail can be found > [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter] > > I think we could use the informer to improve it (which will spawn a > list-watch and maintain the resourceVersion internally) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27924) Add pulsar comments in the datastream package
[ https://issues.apache.org/jira/browse/FLINK-27924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LuNng Wang updated FLINK-27924: --- Component/s: Documentation > Add pulsar comments in the datastream package > - > > Key: FLINK-27924 > URL: https://issues.apache.org/jira/browse/FLINK-27924 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Affects Versions: 1.15.0 >Reporter: LuNng Wang >Priority: Major > > https://github.com/apache/flink/pull/19732#discussion_r883356468 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #258: [chore] Change the session cluster not found log level to error
gyfora merged PR #258: URL: https://github.com/apache/flink-kubernetes-operator/pull/258 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27925) Avoid to create watcher without the resourceVersion
[ https://issues.apache.org/jira/browse/FLINK-27925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-27925: --- Description: Currently, we create the watcher in KubernetesResourceManager. But it do not pass the resourceVersion parameter, it will trigger a request to etcd. It will bring the burden to the etcd in large scale cluster (which have been seen in our internal k8s cluster). More detail can be found [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter] I think we could use the informer to improve it (which will spawn a list-watch and maintain the resourceVersion internally) was: Currently, we create the watcher in KubernetesResourceManager. But it do not pass the resourceVersion parameter, it will read from the etcd. It will bring the burden to the etcd in large scale cluster (which have been seen in our internal k8s cluster). More detail can be found [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter] I think we could use the informer to improve it (which will spawn a list-watch and maintain the resourceVersion internally) > Avoid to create watcher without the resourceVersion > --- > > Key: FLINK-27925 > URL: https://issues.apache.org/jira/browse/FLINK-27925 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Reporter: Aitozi >Priority: Major > > Currently, we create the watcher in KubernetesResourceManager. But it do not > pass the resourceVersion parameter, it will trigger a request to etcd. It > will bring the burden to the etcd in large scale cluster (which have been > seen in our internal k8s cluster). More detail can be found > [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter] > > I think we could use the informer to improve it (which will spawn a > list-watch and maintain the resourceVersion internally) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27925) Avoid to create watcher without the resourceVersion
Aitozi created FLINK-27925: -- Summary: Avoid to create watcher without the resourceVersion Key: FLINK-27925 URL: https://issues.apache.org/jira/browse/FLINK-27925 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Reporter: Aitozi Currently, we create the watcher in KubernetesResourceManager. But it do not pass the resourceVersion parameter, it will read from the etcd. It will bring the burden to the etcd in large scale cluster (which have been seen in our internal k8s cluster). More detail can be found [here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter] I think we could use the informer to improve it (which will spawn a list-watch and maintain the resourceVersion internally) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-26942) Support SELECT clause in CREATE TABLE(CTAS)
[ https://issues.apache.org/jira/browse/FLINK-26942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550743#comment-17550743 ] waywtdcc commented on FLINK-26942: -- CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ] [ WITH [ NO ] DATA ] I think [with [no] data] should be added to indicate whether to only create a table without writing data > Support SELECT clause in CREATE TABLE(CTAS) > --- > > Key: FLINK-26942 > URL: https://issues.apache.org/jira/browse/FLINK-26942 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: tartarus >Priority: Major > Fix For: 1.16.0 > > > Support CTAS(CREATE TABLE AS SELECT) syntax > {code:java} > CREATE TABLE [ IF NOT EXISTS ] table_name > [ WITH ( table_properties ) ] > [ AS query_expression ] {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27924) Add pulsar comments in the datastream package
LuNng Wang created FLINK-27924: -- Summary: Add pulsar comments in the datastream package Key: FLINK-27924 URL: https://issues.apache.org/jira/browse/FLINK-27924 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.15.0 Reporter: LuNng Wang https://github.com/apache/flink/pull/19732#discussion_r883356468 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] pengmide commented on pull request #19869: [FLINK-21996][python] Support Kinesis connector in Python DataStream API
pengmide commented on PR #19869: URL: https://github.com/apache/flink/pull/19869#issuecomment-1148113249 @dianfu I have fixed the problem, please help to review again~ thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27921) Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager
[ https://issues.apache.org/jira/browse/FLINK-27921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo reassigned FLINK-27921: -- Assignee: Aitozi > Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager > -- > > Key: FLINK-27921 > URL: https://issues.apache.org/jira/browse/FLINK-27921 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > As discussed in > https://github.com/apache/flink/pull/19840#discussion_r884242067 .This > ticket is meant to introduce the same mechanism to wait for a slight delay > before process the resource check with {{FineGrainedSlotManager}}. It will > the frequency of unnecessary re-allocations -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
yunfengzhou-hub commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890701884 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.datacache.nonkeyed; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.Optional; + +/** A class that writes cache data to a target file in given file system. */ +@Internal +class FileSegmentWriter implements SegmentWriter { + +/** The tool to serialize received records into bytes. */ +private final TypeSerializer serializer; + +/** The path to the target file. */ +private final Path path; + +/** The output stream that writes to the target file. */ +private final FSDataOutputStream outputStream; + +/** A buffer that wraps the output stream to optimize performance. */ +private final BufferedOutputStream bufferedOutputStream; Review Comment: Since we cannot remove the `bufferedOutputStream.flush()` in `finish()` according to the comment above, we cannot remove this variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
yunfengzhou-hub commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890701650 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.datacache.nonkeyed; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.Optional; + +/** A class that writes cache data to a target file in given file system. */ +@Internal +class FileSegmentWriter implements SegmentWriter { + +/** The tool to serialize received records into bytes. */ +private final TypeSerializer serializer; + +/** The path to the target file. */ +private final Path path; + +/** The output stream that writes to the target file. */ +private final FSDataOutputStream outputStream; + +/** A buffer that wraps the output stream to optimize performance. */ +private final BufferedOutputStream bufferedOutputStream; + +/** The wrapper view of output stream to be used with TypeSerializer API. */ +private final DataOutputView outputView; + +/** The number of records added so far. */ +private int count; + +FileSegmentWriter(TypeSerializer serializer, Path path) throws IOException { +this.serializer = serializer; +this.path = path; +this.outputStream = path.getFileSystem().create(path, FileSystem.WriteMode.NO_OVERWRITE); +this.bufferedOutputStream = new BufferedOutputStream(outputStream); +this.outputView = new DataOutputViewStreamWrapper(bufferedOutputStream); +} + +@Override +public boolean addRecord(T record) throws IOException { +if (outputStream.getPos() >= DataCacheWriter.MAX_SEGMENT_SIZE) { +return false; +} +serializer.serialize(record, outputView); +count++; +return true; +} + +@Override +public Optional finish() throws IOException { +bufferedOutputStream.flush(); Review Comment: `bufferedOutputStream` wraps around `outputStream` and is invisible to `outputStream`. When `outputStream.flush()` is invoked, there might be some bytes left in `bufferedOutputStream`. When I tried to remove this line, most data cache tests failed. Thus this line cannot be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
lindong28 commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890693618 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java: ## @@ -18,38 +18,73 @@ package org.apache.flink.iteration.datacache.nonkeyed; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.MemorySegment; -import java.io.Serializable; +import java.util.List; import java.util.Objects; -/** A segment represents a single file for the cache. */ -public class Segment implements Serializable { +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +/** A segment contains the information about a cache unit. */ +@Internal +public class Segment { + +/** The path to the file containing persisted records. */ private final Path path; -/** The count of the records in the file. */ +/** + * The count of records in the file at the path if the file size is not zero, otherwise the + * count of records in the cache. + */ private final int count; -/** The total length of file. */ -private final long size; +/** The total length of file containing persisted records. */ +private long fsSize = -1L; + +/** The memory segments containing cached records. */ Review Comment: Could you update the code explaining that `the cache list is empty iff the segment has not been cached in memory`? ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java: ## @@ -18,38 +18,73 @@ package org.apache.flink.iteration.datacache.nonkeyed; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.MemorySegment; -import java.io.Serializable; +import java.util.List; import java.util.Objects; -/** A segment represents a single file for the cache. */ -public class Segment implements Serializable { +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +/** A segment contains the information about a cache unit. */ +@Internal +public class Segment { + +/** The path to the file containing persisted records. */ private final Path path; -/** The count of the records in the file. */ +/** + * The count of records in the file at the path if the file size is not zero, otherwise the + * count of records in the cache. + */ private final int count; -/** The total length of file. */ -private final long size; +/** The total length of file containing persisted records. */ +private long fsSize = -1L; + +/** The memory segments containing cached records. */ +private List cache; + +Segment(Path path, int count, long fsSize) { +this.path = checkNotNull(path); +checkArgument(count > 0); +this.count = count; +checkArgument(fsSize > 0); +this.fsSize = fsSize; +} -public Segment(Path path, int count, long size) { -this.path = path; +Segment(Path path, int count, List cache) { +this.path = checkNotNull(path); +checkArgument(count > 0); this.count = count; -this.size = size; +this.cache = checkNotNull(cache); } -public Path getPath() { +void setCache(List cache) { +this.cache = checkNotNull(cache); +} + +void setFsSize(long fsSize) { +checkArgument(fsSize > 0); +this.fsSize = fsSize; +} + +Path getPath() { return path; } -public int getCount() { +int getCount() { return count; } -public long getSize() { -return size; +long getFsSize() { +return fsSize; +} + +List getCache() { +return cache; Review Comment: Instead of indicating `the segment has been read into the memory` by using `cache == 0`, would it be simpler to use `cache.isEmpty()`? This would allow us to handle less special case (e.g. null). After all if the segment has been read into memory, it must have non-empty memory segment list. ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java: ## @@ -18,38 +18,73 @@ package org.apache.flink.iteration.datacache.nonkeyed; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.MemorySegment; -import java.io.Serializable; +import java.util.List; import java.util.Objects; -/** A segment represents a single file for the cache. */ -public class Segment implements Serializable { +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +/** A segment contains the information about
[GitHub] [flink-table-store] JingsongLi commented on pull request #119: POC for ALTER TABLE ... COMPACT
JingsongLi commented on PR #119: URL: https://github.com/apache/flink-table-store/pull/119#issuecomment-1148105554 Hi @LadyForest I think we can have a new formal PR to discuss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] czy006 commented on pull request #256: [FLINK-27646] Flink Operator RoadMap Page
czy006 commented on PR #256: URL: https://github.com/apache/flink-kubernetes-operator/pull/256#issuecomment-1148103093 @gyfora PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #19884: [FLINK-26764][table-planner] Support RESPECT NULLS for function FIRST_VALUE/LAST_VALUE
luoyuxia commented on PR #19884: URL: https://github.com/apache/flink/pull/19884#issuecomment-1148102979 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #19876: [FLINK-27895][build] Enable the CI test for Hive 3.1.2
luoyuxia commented on PR #19876: URL: https://github.com/apache/flink/pull/19876#issuecomment-1148102825 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on a diff in pull request #19883: [FLINK-27901][python] support TableEnvironment.create(configuration)
dianfu commented on code in PR #19883: URL: https://github.com/apache/flink/pull/19883#discussion_r890690352 ## flink-python/pyflink/table/table_environment.py: ## @@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()): self._open() @staticmethod -def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment': Review Comment: Also update the documentation in https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/python_config/. ## flink-python/pyflink/table/table_environment.py: ## @@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()): self._open() @staticmethod -def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment': +def create(conf_or_settings: Union[EnvironmentSettings, Configuration]) -> 'TableEnvironment': Review Comment: This will break backward compatibility. Users may write code as following `TableEnvironment.create(environment_settings=env_settings)`. What about keep the name not changed? ## flink-python/pyflink/table/table_environment.py: ## @@ -98,18 +99,25 @@ def __init__(self, j_tenv, serializer=PickleSerializer()): self._open() @staticmethod -def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment': +def create(conf_or_settings: Union[EnvironmentSettings, Configuration]) -> 'TableEnvironment': """ Creates a table environment that is the entry point and central context for creating Table and SQL API programs. -:param environment_settings: The environment settings used to instantiate the +:param conf_or_settings: The configuration or environment settings used to instantiate the :class:`~pyflink.table.TableEnvironment`. :return: The :class:`~pyflink.table.TableEnvironment`. """ gateway = get_gateway() -j_tenv = gateway.jvm.TableEnvironment.create( -environment_settings._j_environment_settings) +if isinstance(conf_or_settings, EnvironmentSettings): +environment_settings = conf_or_settings +elif isinstance(conf_or_settings, Configuration): +environment_settings = EnvironmentSettings.new_instance() \ Review Comment: EnvironmentSettings.from_configuration(conf_or_settings) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #97: [FLINK-27096] Improve DataCache and KMeans Performance
lindong28 commented on code in PR #97: URL: https://github.com/apache/flink-ml/pull/97#discussion_r890665195 ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java: ## @@ -19,127 +19,162 @@ package org.apache.flink.iteration.datacache.nonkeyed; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SupplierWithException; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; /** Records the data received and replayed them on required. */ public class DataCacheWriter { +/** A soft limit on the max allowed size of a single segment. */ +static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB + +/** The tool to serialize received records into bytes. */ private final TypeSerializer serializer; +/** The file system that contains the cache files. */ private final FileSystem fileSystem; +/** A generator to generate paths of cache files. */ private final SupplierWithException pathGenerator; -private final List finishSegments; +/** An optional pool that provide memory segments to hold cached records in memory. */ +@Nullable private final MemorySegmentPool segmentPool; + +/** The segments that contain previously added records. */ +private final List finishedSegments; -private SegmentWriter currentSegment; +/** The current writer for new records. */ +@Nullable private SegmentWriter currentSegmentWriter; public DataCacheWriter( TypeSerializer serializer, FileSystem fileSystem, SupplierWithException pathGenerator) throws IOException { -this(serializer, fileSystem, pathGenerator, Collections.emptyList()); +this(serializer, fileSystem, pathGenerator, null, Collections.emptyList()); } public DataCacheWriter( TypeSerializer serializer, FileSystem fileSystem, SupplierWithException pathGenerator, -List priorFinishedSegments) +MemorySegmentPool segmentPool) throws IOException { -this.serializer = serializer; -this.fileSystem = fileSystem; -this.pathGenerator = pathGenerator; - -this.finishSegments = new ArrayList<>(priorFinishedSegments); +this(serializer, fileSystem, pathGenerator, segmentPool, Collections.emptyList()); +} -this.currentSegment = new SegmentWriter(pathGenerator.get()); +public DataCacheWriter( +TypeSerializer serializer, +FileSystem fileSystem, +SupplierWithException pathGenerator, +List finishedSegments) +throws IOException { +this(serializer, fileSystem, pathGenerator, null, finishedSegments); } -public void addRecord(T record) throws IOException { -currentSegment.addRecord(record); +public DataCacheWriter( +TypeSerializer serializer, +FileSystem fileSystem, +SupplierWithException pathGenerator, +@Nullable MemorySegmentPool segmentPool, +List finishedSegments) +throws IOException { +this.fileSystem = fileSystem; +this.pathGenerator = pathGenerator; +this.segmentPool = segmentPool; +this.serializer = serializer; +this.finishedSegments = new ArrayList<>(); +this.finishedSegments.addAll(finishedSegments); Review Comment: nits: would it be simpler to use `this.finishedSegments = new ArrayList<>(priorFinishedSegments)`, similar to the approach before this PR? ## flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentReader.java: ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT
[jira] [Updated] (FLINK-27901) Support TableEnvironment.create(configuration) in PyFlink
[ https://issues.apache.org/jira/browse/FLINK-27901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-27901: Summary: Support TableEnvironment.create(configuration) in PyFlink (was: support TableEnvironment.create(configuration) in PyFlink) > Support TableEnvironment.create(configuration) in PyFlink > - > > Key: FLINK-27901 > URL: https://issues.apache.org/jira/browse/FLINK-27901 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.15.0 >Reporter: Juntao Hu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > Align TableEnvironment.create(configuration) API with Java. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-24999) flink-python doesn't work on Java 17
[ https://issues.apache.org/jira/browse/FLINK-24999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-24999: --- Assignee: LuNng Wang > flink-python doesn't work on Java 17 > > > Key: FLINK-24999 > URL: https://issues.apache.org/jira/browse/FLINK-24999 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Chesnay Schepler >Assignee: LuNng Wang >Priority: Major > > {code:java} > java.lang.UnsupportedOperationException: sun.misc.Unsafe or > java.nio.DirectByteBuffer.(long, int) not available > at > io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490) > at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257) > at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247) > at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248) > at > org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:228) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:242) > at > org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132) > at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120) > at > org.apache.flink.table.runtime.arrow.ArrowUtilsTest.testReadArrowBatches(ArrowUtilsTest.java:389) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-24999) flink-python doesn't work on Java 17
[ https://issues.apache.org/jira/browse/FLINK-24999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550735#comment-17550735 ] Dian Fu commented on FLINK-24999: - [~ana4] Thanks for taking this. Have assigned it to you~ > flink-python doesn't work on Java 17 > > > Key: FLINK-24999 > URL: https://issues.apache.org/jira/browse/FLINK-24999 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Chesnay Schepler >Assignee: LuNng Wang >Priority: Major > > {code:java} > java.lang.UnsupportedOperationException: sun.misc.Unsafe or > java.nio.DirectByteBuffer.(long, int) not available > at > io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490) > at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:257) > at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:247) > at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:248) > at > org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:228) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:242) > at > org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132) > at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120) > at > org.apache.flink.table.runtime.arrow.ArrowUtilsTest.testReadArrowBatches(ArrowUtilsTest.java:389) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22
[ https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-26070. --- Resolution: Duplicate > Update dependency numpy to >=1.21.0,<1.22 > - > > Key: FLINK-26070 > URL: https://issues.apache.org/jira/browse/FLINK-26070 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Martijn Visser >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > PyFlink currently sets the required numpy version as >=1.14.3,<1.2 > We should update this to >=1.21.0,<1.22 > Updating numpy will also resolve being marked as vulnerable for > https://nvd.nist.gov/vuln/detail/CVE-2021-33430 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Reopened] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22
[ https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reopened FLINK-26070: - > Update dependency numpy to >=1.21.0,<1.22 > - > > Key: FLINK-26070 > URL: https://issues.apache.org/jira/browse/FLINK-26070 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Martijn Visser >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > PyFlink currently sets the required numpy version as >=1.14.3,<1.2 > We should update this to >=1.21.0,<1.22 > Updating numpy will also resolve being marked as vulnerable for > https://nvd.nist.gov/vuln/detail/CVE-2021-33430 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22
[ https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-26070. --- Resolution: Done > Update dependency numpy to >=1.21.0,<1.22 > - > > Key: FLINK-26070 > URL: https://issues.apache.org/jira/browse/FLINK-26070 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Martijn Visser >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > PyFlink currently sets the required numpy version as >=1.14.3,<1.2 > We should update this to >=1.21.0,<1.22 > Updating numpy will also resolve being marked as vulnerable for > https://nvd.nist.gov/vuln/detail/CVE-2021-33430 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-26070) Update dependency numpy to >=1.21.0,<1.22
[ https://issues.apache.org/jira/browse/FLINK-26070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550734#comment-17550734 ] Dian Fu commented on FLINK-26070: - [~ana4] Thanks for the work in https://github.com/apache/flink/pull/19802. I'm closing this ticket~ > Update dependency numpy to >=1.21.0,<1.22 > - > > Key: FLINK-26070 > URL: https://issues.apache.org/jira/browse/FLINK-26070 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Martijn Visser >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > PyFlink currently sets the required numpy version as >=1.14.3,<1.2 > We should update this to >=1.21.0,<1.22 > Updating numpy will also resolve being marked as vulnerable for > https://nvd.nist.gov/vuln/detail/CVE-2021-33430 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27868) Harden running job check before triggering savepoints or savepoint upgrades
[ https://issues.apache.org/jira/browse/FLINK-27868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550716#comment-17550716 ] Aitozi commented on FLINK-27868: I think it will be a good improvement to savepoint with the work of [https://github.com/apache/flink-kubernetes-operator/pull/255] One question here: How about the FINISH status will be handled? IMO, the finish task will not affect the jobs RUNNING status I think (do not have to be reset to CREATED) > Harden running job check before triggering savepoints or savepoint upgrades > --- > > Key: FLINK-27868 > URL: https://issues.apache.org/jira/browse/FLINK-27868 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Matyas Orhidi >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > Even if the job is in RUNNING state, often not all subtasks are yet running > which leads to savepoint upgrade / savepoint trigger failures. We should > harden the isRunning check we use to include subtask states as well. > This suggestion is desribed more in detail by [~matyas] here: > https://github.com/apache/flink-kubernetes-operator/pull/237#issuecomment-1137054088 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #258: [chore] Change the session cluster not found log level to error
Aitozi commented on code in PR #258: URL: https://github.com/apache/flink-kubernetes-operator/pull/258#discussion_r890651487 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java: ## @@ -54,7 +54,7 @@ public boolean sessionClusterReady(Optional flinkDeploymentOpt) return true; } } else { -logger.info("Session cluster deployment is not found"); +logger.error("Session cluster deployment is not found"); Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period
Aitozi commented on code in PR #255: URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890648242 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ## @@ -69,96 +69,78 @@ public void observeSavepointStatus( .map(Savepoint::getLocation) .orElse(null); -observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig) -.ifPresent( -err -> -EventUtils.createOrUpdateEvent( -flinkService.getKubernetesClient(), -resource, -EventUtils.Type.Warning, -"SavepointError", -SavepointUtils.createSavepointError( -savepointInfo, -resource.getSpec() -.getJob() - .getSavepointTriggerNonce()), -EventUtils.Component.Operator)); - -// We only need to observe latest checkpoint/savepoint for terminal jobs -if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) { -observeLatestSavepoint(savepointInfo, jobId, deployedConfig); +// If any manual or periodic savepoint is in progress, observe it +if (SavepointUtils.savepointInProgress(jobStatus)) { +observeTriggeredSavepoint(resource, jobId, deployedConfig); } -var currentLastSpPath = -Optional.ofNullable(savepointInfo.getLastSavepoint()) -.map(Savepoint::getLocation) -.orElse(null); - -// If the last savepoint information changes we need to patch the status -// to avoid losing this in case of an operator failure after the cluster was shut down -if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { -LOG.info( -"Updating resource status after observing new last savepoint {}", -currentLastSpPath); -statusHelper.patchAndCacheStatus(resource); +// If job is in globally terminal state, observe last savepoint +if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { +observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } + +patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** * Observe the savepoint result based on the current savepoint info. * - * @param currentSavepointInfo the current savepoint info. + * @param resource the resource being observed * @param jobID the jobID of the observed job. * @param deployedConfig Deployed job config. * @return The observed error, if no error observed, {@code Optional.empty()} will be returned. */ -private Optional observeTriggeredSavepointProgress( -SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) { -if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) { -LOG.debug("Savepoint not in progress"); -return Optional.empty(); -} +private void observeTriggeredSavepoint( +AbstractFlinkResource resource, String jobID, Configuration deployedConfig) { + +var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo(); + LOG.info("Observing savepoint status."); -SavepointFetchResult savepointFetchResult = +var savepointFetchResult = flinkService.fetchSavepointInfo( -currentSavepointInfo.getTriggerId(), jobID, deployedConfig); +savepointInfo.getTriggerId(), jobID, deployedConfig); if (savepointFetchResult.isPending()) { Review Comment: Get your point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md
[ https://issues.apache.org/jira/browse/FLINK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-27923. -- Resolution: Fixed > Typo fix for release-1.0.0 quick-start.md > - > > Key: FLINK-27923 > URL: https://issues.apache.org/jira/browse/FLINK-27923 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: James Busche >Assignee: James Busche >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > Noticed a typo while deploying the example. > Currently: > kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml > Where it should be: > kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #259: [FLINK-27923] Update quick-start.md
gyfora merged PR #259: URL: https://github.com/apache/flink-kubernetes-operator/pull/259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] jbusche commented on a diff in pull request #259: [FLINK-27923] Update quick-start.md
jbusche commented on code in PR #259: URL: https://github.com/apache/flink-kubernetes-operator/pull/259#discussion_r890392876 ## docs/content/docs/try-flink-kubernetes-operator/quick-start.md: ## @@ -99,7 +99,7 @@ flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 Once the operator is running as seen in the previous step you are ready to submit a Flink job: ```bash -kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml +kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml Review Comment: Hi @gyfora, sure that looks like it works as well, I'll make that change. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md
[ https://issues.apache.org/jira/browse/FLINK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-27923: -- Assignee: James Busche > Typo fix for release-1.0.0 quick-start.md > - > > Key: FLINK-27923 > URL: https://issues.apache.org/jira/browse/FLINK-27923 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: James Busche >Assignee: James Busche >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > Noticed a typo while deploying the example. > Currently: > kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml > Where it should be: > kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #259: [FLINK-27923] Update quick-start.md
gyfora commented on code in PR #259: URL: https://github.com/apache/flink-kubernetes-operator/pull/259#discussion_r890361897 ## docs/content/docs/try-flink-kubernetes-operator/quick-start.md: ## @@ -99,7 +99,7 @@ flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 Once the operator is running as seen in the previous step you are ready to submit a Flink job: ```bash -kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml +kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml Review Comment: Maybe it would be better to just have `release-1.0/...`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md
[ https://issues.apache.org/jira/browse/FLINK-27923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27923: --- Labels: pull-request-available (was: ) > Typo fix for release-1.0.0 quick-start.md > - > > Key: FLINK-27923 > URL: https://issues.apache.org/jira/browse/FLINK-27923 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: James Busche >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > Noticed a typo while deploying the example. > Currently: > kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml > Where it should be: > kubectl create -f > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] jbusche opened a new pull request, #259: [FLINK-27923] Update quick-start.md
jbusche opened a new pull request, #259: URL: https://github.com/apache/flink-kubernetes-operator/pull/259 Fixing the example from release-0.1 to release-1.0.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27923) Typo fix for release-1.0.0 quick-start.md
James Busche created FLINK-27923: Summary: Typo fix for release-1.0.0 quick-start.md Key: FLINK-27923 URL: https://issues.apache.org/jira/browse/FLINK-27923 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.0.0 Reporter: James Busche Fix For: kubernetes-operator-1.0.0 Noticed a typo while deploying the example. Currently: kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-0.1/examples/basic.yaml Where it should be: kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0.0/examples/basic.yaml -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch
[ https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550583#comment-17550583 ] David Anderson commented on FLINK-27910: This is super confusing, and there's no reasonable workaround – so I've bumped up the priority to Critical. > FileSink not registered the timer to enforce rolling policy if started from > scratch > --- > > Key: FLINK-27910 > URL: https://issues.apache.org/jira/browse/FLINK-27910 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yun Gao >Priority: Critical > > The current FileWriter only register the timer in initializeState, which is > now only called on restoring. Thus if the job is started from scratch, the > timer would fail to be registered and cause the rolling policy not work. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27910) FileSink not registered the timer to enforce rolling policy if started from scratch
[ https://issues.apache.org/jira/browse/FLINK-27910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Anderson updated FLINK-27910: --- Priority: Critical (was: Major) > FileSink not registered the timer to enforce rolling policy if started from > scratch > --- > > Key: FLINK-27910 > URL: https://issues.apache.org/jira/browse/FLINK-27910 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yun Gao >Priority: Critical > > The current FileWriter only register the timer in initializeState, which is > now only called on restoring. Thus if the job is started from scratch, the > timer would fail to be registered and cause the rolling policy not work. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #258: [chore] Change the session cluster not found log level to error
gyfora commented on code in PR #258: URL: https://github.com/apache/flink-kubernetes-operator/pull/258#discussion_r890326994 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java: ## @@ -54,7 +54,7 @@ public boolean sessionClusterReady(Optional flinkDeploymentOpt) return true; } } else { -logger.info("Session cluster deployment is not found"); +logger.error("Session cluster deployment is not found"); Review Comment: I think INFO/WARN is more appropriate as this is completely expected to happen and it's not really an error. @Aitozi what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on a diff in pull request #19561: [FLINK-26414][hive] Hive dialect supports macro
lsyldliu commented on code in PR #19561: URL: https://github.com/apache/flink/pull/19561#discussion_r890126279 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/planner/delegation/hive/HiveASTParserTest.java: ## @@ -139,6 +139,13 @@ public void testFunction() throws Exception { assertDDLType(HiveASTParser.TOK_SHOWFUNCTIONS, "show functions"); } +@Test +public void testMacro() throws Exception { Review Comment: It will be better if we add a test that create a mcro with qualified name ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java: ## @@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) { } } +private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException { +String macroName = ast.getChild(0).getText(); +if (FunctionUtils.isQualifiedFunctionName(macroName)) { +throw new SemanticException("Temporary macro cannot be created with a qualified name."); Review Comment: According to the method `FunctionUtils.isQualifiedFunctionName`, this error message may be not cleared for user? so I think we can improve it. ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java: ## @@ -36,23 +38,48 @@ public static final long serialVersionUID = 393313529306818205L; private final String className; +// a field to hold the string serialized for the UDF. +// we sometimes need to hold it in case of some serializable UDF will contain +// additional information such as Hive's GenericUDFMacro and if we construct the UDF directly by +// getUDFClass#newInstance, the information will be missed. +private String udfSerializedString; private transient UDFType instance = null; public HiveFunctionWrapper(String className) { this.className = className; } +/** + * Create a HiveFunctionWrapper with a UDF instance. In this constructor, the instance will be + * serialized to string and held on in the HiveFunctionWrapper. + */ +public HiveFunctionWrapper(String className, UDFType serializableInstance) { +this.className = className; Review Comment: we can call it directly `this(className);` ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java: ## @@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) { } } +private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException { +String macroName = ast.getChild(0).getText(); +if (FunctionUtils.isQualifiedFunctionName(macroName)) { +throw new SemanticException("Temporary macro cannot be created with a qualified name."); +} + +List arguments = getColumns((HiveParserASTNode) ast.getChild(1), true); Review Comment: The `arguments ` readability is not good? IMO, it refers the all columns of the table? In other words, we should add an annotation about it. ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java: ## @@ -88,4 +115,20 @@ public String getClassName() { public Class getUDFClass() throws ClassNotFoundException { return (Class) Thread.currentThread().getContextClassLoader().loadClass(className); } + +/** + * Deserialize UDF used the udfSerializedString held on. + * + * @return the UDF deserialized + */ +private UDFType deserializeUDF() { +try { +return (UDFType) +SerializationUtilities.deserializeObject( +udfSerializedString, (Class) getUDFClass()); +} catch (ClassNotFoundException e) { +throw new FlinkHiveUDFException( +String.format("Failed to deserialize function %s", className), e); +} Review Comment: "Failed to deserialize function %s." ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java: ## @@ -506,6 +532,106 @@ private Operation convertCreateFunction(HiveParserASTNode ast) { } } +private Operation convertCreateMacro(HiveParserASTNode ast) throws SemanticException { +String macroName = ast.getChild(0).getText(); +if (FunctionUtils.isQualifiedFunctionName(macroName)) { +throw new SemanticException("Temporary macro cannot be created with a qualified name."); +} + +List arguments = getColumns((HiveParserASTNode) ast.getChild(1), true); +Set actualColumnNames =
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #255: [FLINK-27257] Retry failed savepoints within grace period
gyfora commented on PR #255: URL: https://github.com/apache/flink-kubernetes-operator/pull/255#issuecomment-1147637009 @Aitozi updated the PR according to your comments :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period
gyfora commented on code in PR #255: URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890310614 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ## @@ -69,96 +69,78 @@ public void observeSavepointStatus( .map(Savepoint::getLocation) .orElse(null); -observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig) -.ifPresent( -err -> -EventUtils.createOrUpdateEvent( -flinkService.getKubernetesClient(), -resource, -EventUtils.Type.Warning, -"SavepointError", -SavepointUtils.createSavepointError( -savepointInfo, -resource.getSpec() -.getJob() - .getSavepointTriggerNonce()), -EventUtils.Component.Operator)); - -// We only need to observe latest checkpoint/savepoint for terminal jobs -if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) { -observeLatestSavepoint(savepointInfo, jobId, deployedConfig); +// If any manual or periodic savepoint is in progress, observe it +if (SavepointUtils.savepointInProgress(jobStatus)) { +observeTriggeredSavepoint(resource, jobId, deployedConfig); } -var currentLastSpPath = -Optional.ofNullable(savepointInfo.getLastSavepoint()) -.map(Savepoint::getLocation) -.orElse(null); - -// If the last savepoint information changes we need to patch the status -// to avoid losing this in case of an operator failure after the cluster was shut down -if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { -LOG.info( -"Updating resource status after observing new last savepoint {}", -currentLastSpPath); -statusHelper.patchAndCacheStatus(resource); +// If job is in globally terminal state, observe last savepoint +if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { +observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } + +patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** * Observe the savepoint result based on the current savepoint info. * - * @param currentSavepointInfo the current savepoint info. + * @param resource the resource being observed * @param jobID the jobID of the observed job. * @param deployedConfig Deployed job config. * @return The observed error, if no error observed, {@code Optional.empty()} will be returned. */ -private Optional observeTriggeredSavepointProgress( -SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) { -if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) { -LOG.debug("Savepoint not in progress"); -return Optional.empty(); -} +private void observeTriggeredSavepoint( +AbstractFlinkResource resource, String jobID, Configuration deployedConfig) { + +var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo(); + LOG.info("Observing savepoint status."); -SavepointFetchResult savepointFetchResult = +var savepointFetchResult = flinkService.fetchSavepointInfo( -currentSavepointInfo.getTriggerId(), jobID, deployedConfig); +savepointInfo.getTriggerId(), jobID, deployedConfig); if (savepointFetchResult.isPending()) { -if (SavepointUtils.gracePeriodEnded( -configManager.getOperatorConfiguration(), currentSavepointInfo)) { -String errorMsg = -"Savepoint operation timed out after " -+ configManager -.getOperatorConfiguration() -.getSavepointTriggerGracePeriod(); -currentSavepointInfo.resetTrigger(); -LOG.error(errorMsg); -return Optional.of(errorMsg); -} else { -LOG.info("Savepoint operation not finished yet, waiting within grace period..."); -return Optional.empty(); -} +LOG.info("Savepoint operation not finished yet..."); +
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period
gyfora commented on code in PR #255: URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890307980 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions { "The interval for observing status for in-progress operations such as deployment and savepoints."); public static final ConfigOption OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD = Review Comment: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org