[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286778051 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: I know what you mean, please see this [discussion section](https://github.com/apache/flink/pull/8322#discussion_r280328078). Currently, the counting mechanism is not based on checkpoint order and based on execution order (execution phase). It's a temporal solution, we will give a better solution in the future. Its jira issue id is FLINK-12514. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286778051 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: I know what you mean, please see this [discussion section](https://github.com/apache/flink/pull/8322#discussion_r280328078). Currently, the counting mechanism is not based on checkpoint order. It's a temporal solution, we will give a better solution in the future. Its jira issue id is FLINK-12514. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846425#comment-16846425 ] Kazunori Shinhira commented on FLINK-12586: --- Thank you for your code [~fan_li_ya]. It is exactly what I expected. It looks good to me. > Stderr and stdout are reversed in OptimizerPlanEnvironment > -- > > Key: FLINK-12586 > URL: https://issues.apache.org/jira/browse/FLINK-12586 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.7.2, 1.8.0 >Reporter: Kazunori Shinhira >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that > stdout is output as System.err and stderr is output as System.out. > [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108] > > I think, It should be like as bellow. > {code:java} > throw new ProgramInvocationException( > "The program plan could not be fetched - the program aborted pre-maturely." > + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr) > + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286776553 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: For concurrent checkpoints, such as chk-1 and chk-2, we may get chk-2 success and then receive chk-1 failure (add a test now and may change the expected result after the change of 12514), in my opinion, we should add a test to guarantee the result is what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286775875 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -202,31 +201,33 @@ public CheckpointCoordinator( StateBackend checkpointStateBackend, Executor executor, SharedStateRegistryFactory sharedStateRegistryFactory, - boolean isPreferCheckpointForRecovery) { + CheckpointFailureManager failureManager) { // sanity checks checkNotNull(checkpointStateBackend); - checkArgument(baseInterval > 0, "Checkpoint base interval must be larger than zero"); - checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); - checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); - checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); + checkArgument(chkConfig.getCheckpointInterval() > 0, "Checkpoint base interval must be larger than zero"); Review comment: The check here can also merge into `CheckpointCoordinatorConfiguration` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
flinkbot commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517#issuecomment-495066880 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-495066930 @klion26 Thanks for your review suggestion. What do you think about the new change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10921) Prioritize shard consumers in Kinesis Consumer by event time
[ https://issues.apache.org/jira/browse/FLINK-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10921: --- Labels: pull-request-available (was: ) > Prioritize shard consumers in Kinesis Consumer by event time > - > > Key: FLINK-10921 > URL: https://issues.apache.org/jira/browse/FLINK-10921 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > Shard consumer threads currently emit records directly. In order to align > shards by event time, decouple shard consumer threads and emitter with a > queue, as described in [1]. > [1] > https://lists.apache.org/thread.html/ac41718246ad8f6098efaf7dbf5f7182d60abdc473e8bf3c96ef5968@%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tweise opened a new pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
tweise opened a new pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer URL: https://github.com/apache/flink/pull/8517 ## What is the purpose of the change * This pull request makes adds support for source synchronization to the Kinesis consumer. Source synchronization aligns shard consumption based on the per-shard watermark. ## Brief change log - Adds global watermark aggregation to track the current low watermark across subtasks - Adds record emitter to prioritize per-shard queues and align the shard consumers through back pressure - Above components are designed to be reusable for the upcoming support in the Kafka consumer. - Integration into KinesisDataFetcher ## Verifying this change This change added tests and can be verified as follows: - This is a back port from Lyft internal codebase where it has been tested extensively and at scale. - PR includes test for consumer and basic tests for emitter and watermark tracker. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (user documentation will be added separately) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286774079 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: I think it's not necessary. The counter is `AtomicInteger` and we tolerance concurrent checkpoint and concurrent counting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286773309 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -512,24 +513,19 @@ public boolean isArchived() { } public void enableCheckpointing( - long interval, - long checkpointTimeout, - long minPauseBetweenCheckpoints, - int maxConcurrentCheckpoints, - CheckpointRetentionPolicy retentionPolicy, + CheckpointCoordinatorConfiguration chkConfig, List verticesToTrigger, List verticesToWaitFor, List verticesToCommitTo, List> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker, - boolean isPreferCheckpointForRecovery) { + CheckpointStatsTracker statsTracker) { // simple sanity checks - checkArgument(interval >= 10, "checkpoint interval must not be below 10ms"); - checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms"); + checkArgument(chkConfig.getCheckpointInterval() >= 10, "checkpoint interval must not be below 10ms"); Review comment: Actually, the interval in many places is not united. In CheckpointConfig it used -1 as a disabled flag and the value which large than 0 is legal. So in CheckpointCoordinatorConfiguration the sanity check based on 1. It's better to unify them. Either -1 or larger than or equal to 10. So agree with you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive URL: https://github.com/apache/flink/pull/8511#issuecomment-495061840 > Thank you for fixing this bug. > Does it make sense to add the job triggering this bug as a test, so that we won't introduce it in the future again? @rmetzger thanks a lot for the good suggestion. We have added a test case for that. Please help take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286768669 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -512,24 +513,19 @@ public boolean isArchived() { } public void enableCheckpointing( - long interval, - long checkpointTimeout, - long minPauseBetweenCheckpoints, - int maxConcurrentCheckpoints, - CheckpointRetentionPolicy retentionPolicy, + CheckpointCoordinatorConfiguration chkConfig, List verticesToTrigger, List verticesToWaitFor, List verticesToCommitTo, List> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker, - boolean isPreferCheckpointForRecovery) { + CheckpointStatsTracker statsTracker) { // simple sanity checks - checkArgument(interval >= 10, "checkpoint interval must not be below 10ms"); - checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms"); + checkArgument(chkConfig.getCheckpointInterval() >= 10, "checkpoint interval must not be below 10ms"); Review comment: As we have `CheckpointCoordinatorConfiguration` could we just place all the checks in the constructor of `CheckpointCoordinatorConfiguration`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286770672 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { Review comment: Do we need to add a test case for concurrent checkpoint? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12551) elasticsearch6 connector print log error
[ https://issues.apache.org/jira/browse/FLINK-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunxiongkun closed FLINK-12551. --- Resolution: Not A Bug > elasticsearch6 connector print log error > > > Key: FLINK-12551 > URL: https://issues.apache.org/jira/browse/FLINK-12551 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.6.3 >Reporter: sunxiongkun >Priority: Minor > > when i use elasticsearch connector ,when my project is running,i find some > data does not insert elasticsearch ,so i want to read log help me ,but the > log does contain importance message,so i read source code > (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i > find a error on write ERROR log. > > {code:java} > @Override > public void afterBulk(long executionId, BulkRequest request, BulkResponse > response) { > if (response.hasFailures()) { > BulkItemResponse itemResponse; > Throwable failure; > RestStatus restStatus; > try { >for (int i = 0; i < response.getItems().length; i++) { > itemResponse = response.getItems()[i]; > failure = > callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); > if (failure != null) { > LOG.error("Failed Elasticsearch item request: {}", > itemResponse.getFailureMessage(), failure); > restStatus = itemResponse.getFailure().getStatus(); > if (restStatus == null) { > failureHandler.onFailure(request.requests().get(i), failure, -1, > requestIndexer); > } else { > failureHandler.onFailure(request.requests().get(i), failure, > restStatus.getStatus(), requestIndexer); > } > } >} > } catch (Throwable t) { >// fail the sink and skip the rest of the items >// if the failure handler decides to throw an exception >failureThrowable.compareAndSet(null, t); > } > } > if (flushOnCheckpoint) { > numPendingRequests.getAndAdd(-request.numberOfActions()); > } > } > {code} > {code:java} > @Override > public void afterBulk(long executionId, BulkRequest request, Throwable > failure) { > LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), > failure.getCause()); > try { >for (ActionRequest action : request.requests()) { > failureHandler.onFailure(action, failure, -1, requestIndexer); >} > } catch (Throwable t) { >// fail the sink and skip the rest of the items >// if the failure handler decides to throw an exception >failureThrowable.compareAndSet(null, t); > } > if (flushOnCheckpoint) { >numPendingRequests.getAndAdd(-request.numberOfActions()); > } > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r286768901 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java ## @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration( int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery) { + boolean isPerfetCheckpointForRecovery, + int tolerableCpFailureNumber) { // sanity checks if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1 || Review comment: Why do we need `tolerableCpFailureNumber > Integer.MAX_VALUE`, I think an int can never bigger than `Integer.MAX_VALUE` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog
bowenli86 commented on issue #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog URL: https://github.com/apache/flink/pull/8449#issuecomment-495060568 ok, let's leave that part as it for now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12582) Alteration APIs in catalogs should check existing object and new object are of the same class
[ https://issues.apache.org/jira/browse/FLINK-12582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12582. Resolution: Fixed merged in 1.9.0: aca9c018a4b26e50333c33426e77323cbb9b7960 > Alteration APIs in catalogs should check existing object and new object are > of the same class > - > > Key: FLINK-12582 > URL: https://issues.apache.org/jira/browse/FLINK-12582 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Alterations in catalogs should check existing object and new object are of > the same class. > Most of them currently don't, e.g. you can alter an existing generic table > with a new hive table in GenericInMemoryCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class
asfgit closed pull request #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class URL: https://github.com/apache/flink/pull/8514 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class
bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class URL: https://github.com/apache/flink/pull/8514#issuecomment-495058160 thanks @xuefuz , merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12593) Revise the document for CEP
[ https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846399#comment-16846399 ] Liya Fan commented on FLINK-12593: -- Hi [~jark], thank you so much for sharing the good news and the documents. I will take a closer look at the documents. However, I guess I also need to get more knowledge about CEP, before I can give some useful comments about it :) > Revise the document for CEP > --- > > Key: FLINK-12593 > URL: https://issues.apache.org/jira/browse/FLINK-12593 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Liya Fan >Priority: Minor > > The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to > understand and follow, especially for beginners. > I suggest revising from the following aspects: > 1. Give more detailed descriptions of existing examples. > 2. More examples are required to illustrate the features. > 3. More explanations are required for some concepts, like contiguity. > 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type
zhijiangW commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type URL: https://github.com/apache/flink/pull/8346#discussion_r286761364 ## File path: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ## @@ -480,6 +482,32 @@ public void postVisit(PlanNode node) { if (node instanceof SourcePlanNode || node instanceof NAryUnionPlanNode || node instanceof SolutionSetPlanNode) { return; } + + // if this is a blocking shuffle vertex, we add one IntermediateDataSetID to its predecessor and return + if (node instanceof SinkPlanNode) { + Object userCodeObject = node.getProgramOperator().getUserCodeWrapper().getUserCodeObject(); + if (userCodeObject instanceof BlockingShuffleOutputFormat) { + PlanNode precedentNode = node.getInputs().iterator().next().getSource(); + JobVertex precedentVertex; + if (this.vertices.containsKey(precedentNode)) { Review comment: Yes, we have some history inconsistent issues. But it should not bring that in new codes. For the history codes we might solve in a separate hotfix or just keep that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog
lirui-apache commented on issue #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog URL: https://github.com/apache/flink/pull/8449#issuecomment-495051154 > @lirui-apache Thank you very much for the update! > > I just spotted that, currently how several APIs impl work like this: 1) get a raw hive table 2) parse part of the raw table. The latter step actually duplicate with logic in `instantiateHiveCatalogTable()`. E.g. `ensureTableAndPartitionMatch()` parses `FLINK_PROPERTY_IS_GENERIC`, `instantiateHivePartition()` parses partition keys, `ensurePartitionedTable()` parses the raw table's partition key size, all of which we can get by just parsing the raw table to a `CatalogTable` thru `instantiateHiveCatalogTable()` in advance. The current duplication also means if we change some general logic in parsing a hive table, we need to change two places. Thus I wonder if it makes sense to just parse the raw table as whole at the beginning rather than having scattered places each parsing only part of it themselves. And we can remove util methods such as `getFieldNames()` which is only used to get the partition keys which is already available in `CatalogTable`. > > For example, change > > ``` > public void createPartition(...) { > Table hiveTable = getHiveTable(tablePath); > ensureTableAndPartitionMatch(hiveTable, partition); > ensurePartitionedTable(tablePath, hiveTable); > try { > client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition)); > } ... > } > ``` > > to something like: > > ``` > public void createPartition(...) { > Table hiveTable = getHiveTable(tablePath); > CatalogBaseTable catalogTable = instantiateHiveCatalogTable(hiveTable); > ... check whether catalogTabe and catalogPartition type matches would be much easier here ... > ... check whether catalogTable is partitioned would be easier here ... > try { > client.add_partition( > instantiateHivePartition(catalogTable, partitionSpec, partition, hiveTable.getSd())); > } ... > } > ``` I'm not sure how much benefit this can bring us. It might make `ensureTableAndPartitionMatch` a little easier -- we can check the type of CatalogBaseTable instead of parsing a property. But I don't think we should take the same approach for `ensurePartitionedTable`. `ensurePartitionedTable` is already simple enough. And for APIs like `listPartitions`, creating a CatalogBaseTable just to get num of partition cols seems an overkill to me. And the same applies to `getFieldNames`. E.g. I think it'll be an overkill to create a CatalogBaseTable to get partition col names in `dropPartition`. Maybe an alternative approach to the problem you mentioned is to have more util methods, in order to avoid duplication. For example, we should have a util method to decide whether a Hive table is generic. And all the APIs needing this logic can call the util method. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12593) Revise the document for CEP
[ https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846393#comment-16846393 ] Jark Wu edited comment on FLINK-12593 at 5/23/19 3:02 AM: -- Hi [~fan_li_ya] thanks a lot for the ideas. The community is also working on improving documentation and started a discussion[1] about this two days ago. It would be great if you can leave some thoughts and comments regarding to the CEP restructure in the design doc[2] there. So that we can work together to improve documentation. [1]. http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-to-Restructure-Update-amp-Rework-Apache-Flink-s-Documentation-tt29014.html [2]. https://docs.google.com/document/d/1pPM4vTWUUiJb73pd8OqHA1EWVl9cv6kXoCB5F7_J6gM/edit was (Author: jark): Hi [~fan_li_ya] thanks a lot for the ideas. The community is also working on improve documentation and started a discussion[1] about this two days ago. It would be great if you can leave some thoughts and comments regarding to the CEP restructure in the design doc[2] there. So that we can work together to improve documentation. [1]. http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-to-Restructure-Update-amp-Rework-Apache-Flink-s-Documentation-tt29014.html [2]. https://docs.google.com/document/d/1pPM4vTWUUiJb73pd8OqHA1EWVl9cv6kXoCB5F7_J6gM/edit > Revise the document for CEP > --- > > Key: FLINK-12593 > URL: https://issues.apache.org/jira/browse/FLINK-12593 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Liya Fan >Priority: Minor > > The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to > understand and follow, especially for beginners. > I suggest revising from the following aspects: > 1. Give more detailed descriptions of existing examples. > 2. More examples are required to illustrate the features. > 3. More explanations are required for some concepts, like contiguity. > 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12593) Revise the document for CEP
[ https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846393#comment-16846393 ] Jark Wu commented on FLINK-12593: - Hi [~fan_li_ya] thanks a lot for the ideas. The community is also working on improve documentation and started a discussion[1] about this two days ago. It would be great if you can leave some thoughts and comments regarding to the CEP restructure in the design doc[2] there. So that we can work together to improve documentation. [1]. http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-to-Restructure-Update-amp-Rework-Apache-Flink-s-Documentation-tt29014.html [2]. https://docs.google.com/document/d/1pPM4vTWUUiJb73pd8OqHA1EWVl9cv6kXoCB5F7_J6gM/edit > Revise the document for CEP > --- > > Key: FLINK-12593 > URL: https://issues.apache.org/jira/browse/FLINK-12593 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Liya Fan >Priority: Minor > > The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to > understand and follow, especially for beginners. > I suggest revising from the following aspects: > 1. Give more detailed descriptions of existing examples. > 2. More examples are required to illustrate the features. > 3. More explanations are required for some concepts, like contiguity. > 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12593) Revise the document for CEP
[ https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-12593: --- Assignee: (was: Jark Wu) > Revise the document for CEP > --- > > Key: FLINK-12593 > URL: https://issues.apache.org/jira/browse/FLINK-12593 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Liya Fan >Priority: Minor > > The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to > understand and follow, especially for beginners. > I suggest revising from the following aspects: > 1. Give more detailed descriptions of existing examples. > 2. More examples are required to illustrate the features. > 3. More explanations are required for some concepts, like contiguity. > 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12593) Revise the document for CEP
[ https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-12593: --- Assignee: Jark Wu > Revise the document for CEP > --- > > Key: FLINK-12593 > URL: https://issues.apache.org/jira/browse/FLINK-12593 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Liya Fan >Assignee: Jark Wu >Priority: Minor > > The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to > understand and follow, especially for beginners. > I suggest revising from the following aspects: > 1. Give more detailed descriptions of existing examples. > 2. More examples are required to illustrate the features. > 3. More explanations are required for some concepts, like contiguity. > 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12335) Improvement the code and performance of class SegmentsUtil
[ https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-12335: Summary: Improvement the code and performance of class SegmentsUtil (was: Remove useless code in SegmentsUtil) > Improvement the code and performance of class SegmentsUtil > -- > > Key: FLINK-12335 > URL: https://issues.apache.org/jira/browse/FLINK-12335 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Improve the performance of class SegmentsUtil: > To evaluate the offset, an integer is bitand with a mask to clear to low > bits, and then shift right. The bitand is useless: > ((index & BIT_BYTE_POSITION_MASK) >>> 3) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12335) Remove useless code in SegmentsUtil
[ https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12335. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 11a96fdf213467595dad73cffd9b05134a4d0d75 > Remove useless code in SegmentsUtil > --- > > Key: FLINK-12335 > URL: https://issues.apache.org/jira/browse/FLINK-12335 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Improve the performance of class SegmentsUtil: > To evaluate the offset, an integer is bitand with a mask to clear to low > bits, and then shift right. The bitand is useless: > ((index & BIT_BYTE_POSITION_MASK) >>> 3) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846381#comment-16846381 ] Liya Fan commented on FLINK-12586: -- Hi [~shinhira_kazunori], thanks for finding this problem. I have provided a fix. Please take a look. > Stderr and stdout are reversed in OptimizerPlanEnvironment > -- > > Key: FLINK-12586 > URL: https://issues.apache.org/jira/browse/FLINK-12586 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.7.2, 1.8.0 >Reporter: Kazunori Shinhira >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that > stdout is output as System.err and stderr is output as System.out. > [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108] > > I think, It should be like as bellow. > {code:java} > throw new ProgramInvocationException( > "The program plan could not be fetched - the program aborted pre-maturely." > + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr) > + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task
[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846382#comment-16846382 ] Congxian Qiu(klion26) commented on FLINK-12296: --- merged * master ee60846dc588b1a832a497ff9522d7a3a282c350 * release-1.8 531d727f9b32c310d8d63b253019b8cc4a23a3eb * release-1.7 1ce2efd7a38d091fc004a8dba034ece0bcc42385 * release-1.6 0dda6fe9dff4f667b110cda39bfe9738ba615b24 > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > --- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { >return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { >return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { >return "chk_" + checkpointId; > } > {code} > [1] > [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8516: [FLINK-12586][Command Line Client]Stderr and stdout are reversed in OptimizerPlanEnvironment
flinkbot commented on issue #8516: [FLINK-12586][Command Line Client]Stderr and stdout are reversed in OptimizerPlanEnvironment URL: https://github.com/apache/flink/pull/8516#issuecomment-495045139 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12586: --- Labels: pull-request-available (was: ) > Stderr and stdout are reversed in OptimizerPlanEnvironment > -- > > Key: FLINK-12586 > URL: https://issues.apache.org/jira/browse/FLINK-12586 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.7.2, 1.8.0 >Reporter: Kazunori Shinhira >Priority: Minor > Labels: pull-request-available > > In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that > stdout is output as System.err and stderr is output as System.out. > [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108] > > I think, It should be like as bellow. > {code:java} > throw new ProgramInvocationException( > "The program plan could not be fetched - the program aborted pre-maturely." > + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr) > + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] liyafan82 opened a new pull request #8516: [FLINK-12586][Command Line Client]Stderr and stdout are reversed in OptimizerPlanEnvironment
liyafan82 opened a new pull request #8516: [FLINK-12586][Command Line Client]Stderr and stdout are reversed in OptimizerPlanEnvironment URL: https://github.com/apache/flink/pull/8516 ## What is the purpose of the change Fix issue 12586: In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that stdout is output as System.err and stderr is output as System.out. ## Brief change log - Change the method OptimizerPlanEnvironment#getOptimizedPlan ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12594) Make Job ID Configurable for a Job Cluster
Chaoran Yu created FLINK-12594: -- Summary: Make Job ID Configurable for a Job Cluster Key: FLINK-12594 URL: https://issues.apache.org/jira/browse/FLINK-12594 Project: Flink Issue Type: Improvement Affects Versions: 1.8.0 Reporter: Chaoran Yu Currently the job ID is only configurable for a job in a session cluster. If a job is launched as a job cluster, the job ID is always the default value of without an option to manually set it. This [thread|[http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAKiyyaE+wx72gAwgDbLuThZKJR8VDPUid0Q1S=qx91pDf3=z...@mail.gmail.com%3E]] in the mailing list also talked about the same issue. It should be made configurable in the job cluster case. In the [documentation|[https://github.com/apache/flink/tree/master/flink-container/kubernetes]] for running a job cluster on Kubernetes, it's mentioned that the job ID is configurable via the `--job-id` flag, which is false because it's not implemented anywhere. The documentation should be fixed until this feature is implemented. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil
asfgit closed pull request #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil URL: https://github.com/apache/flink/pull/8278 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil
wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil URL: https://github.com/apache/flink/pull/8278#issuecomment-495042553 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil
wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil URL: https://github.com/apache/flink/pull/8278#issuecomment-495042540 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil
flinkbot edited a comment on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil URL: https://github.com/apache/flink/pull/8278#issuecomment-487004244 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @wuchong [committer] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil
JingsongLi commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil URL: https://github.com/apache/flink/pull/8278#issuecomment-495041136 @flinkbot attention @wuchong This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5809) Add registerNextWatermarkCallback() to TimerService
[ https://issues.apache.org/jira/browse/FLINK-5809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846371#comment-16846371 ] vinoyang commented on FLINK-5809: - It seems the {{registerEventTimeTimer}} does not exist in the current code base. So maybe we can close this issue now. cc [~aljoscha] > Add registerNextWatermarkCallback() to TimerService > --- > > Key: FLINK-5809 > URL: https://issues.apache.org/jira/browse/FLINK-5809 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Aljoscha Krettek >Priority: Major > > This new method is similar to {{registerEventTimeTimer()}} but instead of > specifying a certain time where the timer should fire this will simply fire > whenever a new watermark arrives. > This is, for example, useful when you want to buffer elements for sorting by > event time and always want to emit as soon as the operator receives a > watermark. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12593) Revise the document for CEP
Liya Fan created FLINK-12593: Summary: Revise the document for CEP Key: FLINK-12593 URL: https://issues.apache.org/jira/browse/FLINK-12593 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Liya Fan The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to understand and follow, especially for beginners. I suggest revising from the following aspects: 1. Give more detailed descriptions of existing examples. 2. More examples are required to illustrate the features. 3. More explanations are required for some concepts, like contiguity. 4. We can add more references to better understand the concepts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type
Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type URL: https://github.com/apache/flink/pull/8346#issuecomment-495040043 @zhijiangW Thanks for you review and advice,I've updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8969) Move TimerService into state backend
[ https://issues.apache.org/jira/browse/FLINK-8969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846368#comment-16846368 ] vinoyang commented on FLINK-8969: - Hi [~phoenixjiangnan] Does this proposal still make sense based on the current code base? > Move TimerService into state backend > > > Key: FLINK-8969 > URL: https://issues.apache.org/jira/browse/FLINK-8969 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Bowen Li >Priority: Major > > upon discussion with [~aljoscha]. More details need to be added here -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type
Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type URL: https://github.com/apache/flink/pull/8346#discussion_r286751650 ## File path: flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java ## @@ -265,6 +274,55 @@ public void testArtifactCompression() throws IOException { assertState(nonExecutableDirEntry, false, true); } + @Test + public void testGeneratingJobGraphWithUnconsumedResultPartition() { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input = env.fromElements(new Tuple2<>(1L, 2L)) + .setParallelism(1); + + DataSet ds = input.map((MapFunction, Object>) value -> new Tuple2<>(value.f0 + 1, value.f1)) + .setParallelism(3); + + AbstractID intermediateDataSetID = new AbstractID(); + + // this output branch will be excluded. + ds.output(BlockingShuffleOutputFormat.createOutputFormat(intermediateDataSetID)) + .setParallelism(1); + + // this is the normal output branch. + ds.output(new DiscardingOutputFormat()) + .setParallelism(1); + + Plan plan = env.createProgramPlan(); + Optimizer pc = new Optimizer(new Configuration()); + OptimizedPlan op = pc.compile(plan); + + JobGraphGenerator jgg = new JobGraphGenerator(); + JobGraph jobGraph = jgg.compileJobGraph(op); + + Assert.assertEquals(3, jobGraph.getVerticesSortedTopologicallyFromSources().size()); + + JobVertex inputVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); + JobVertex mapVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); + JobVertex outputVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(2); + + Assert.assertTrue(outputVertex instanceof OutputFormatVertex); Review comment: > It is better to use `hamcrest` to verify `instanceof` here. Great, it seemes `hamcrest ` is more readable here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API
[ https://issues.apache.org/jira/browse/FLINK-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846359#comment-16846359 ] Jark Wu commented on FLINK-12401: - Do you have a design about incremental emit [~hequn8128] ? For example, what's the API looks like, when will be invoked, what is it used for? > Support incremental emit for non-window streaming FlatAggregate on Table API > > > Key: FLINK-12401 > URL: https://issues.apache.org/jira/browse/FLINK-12401 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As described in > [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739], > there are two output modes for non-window streaming flatAggregate. One is > emitting with full values, the other is emitting with incremental values. > [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the > former one, this jira is going to support the latter one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-12539: - Fix Version/s: 1.8.1 > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tweise merged pull request #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
tweise merged pull request #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) URL: https://github.com/apache/flink/pull/8515 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces URL: https://github.com/apache/flink/pull/8476#discussion_r286742814 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ## @@ -156,9 +129,41 @@ public StreamInputProcessor( } public boolean processInput() throws Exception { - if (isFinished) { - return false; + initializeNumRecordsIn(); + + StreamElement recordOrMark = input.pollNext(); + if (recordOrMark == null) { + input.isAvailable().get(); + return input.isFinished(); } + + processElement(recordOrMark); + return true; + } + + private void processElement(StreamElement recordOrMark) throws Exception { + if (recordOrMark.isWatermark()) { + // handle watermark + statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); Review comment: In fact, I care about that the input channel index is exposed to input processors. As long as it's not exposed to the input processor (for example, encapsulating a class as you said), it's fine to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces URL: https://github.com/apache/flink/pull/8476#discussion_r286741019 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/NetworkInput.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of taking {@link InputGate} as {@link Input}. + */ +@Internal +public final class NetworkInput implements Input { + + private final int numberOfInputChannels; + + private final CheckpointBarrierHandler barrierHandler; + + private final DeserializationDelegate deserializationDelegate; + + private final RecordDeserializer>[] recordDeserializers; + + /** +* The channel from which a buffer came, tracked so that we can appropriately map +* the watermarks and watermark statuses to the correct channel index of the correct valve. +*/ + private int currentChannel = -1; + + private RecordDeserializer> currentRecordDeserializer = null; + + private boolean isFinished = false; + + @SuppressWarnings("unchecked") + public NetworkInput( + CheckpointBarrierHandler barrierHandler, + TypeSerializer inputSerializer, + IOManager ioManager) { + this.barrierHandler = barrierHandler; + this.numberOfInputChannels = barrierHandler.getNumberOfInputChannels(); + this.deserializationDelegate = new NonReusingDeserializationDelegate<>( + new StreamElementSerializer<>(inputSerializer)); + + // Initialize one deserializer per input channel + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[numberOfInputChannels]; + for (int i = 0; i < recordDeserializers.length; i++) { + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>( + ioManager.getSpillingDirectoriesPaths()); + } + } + + @Override + public StreamElement pollNext() throws Exception { + + while (true) { + // get the stream element from the deserializer + if (currentRecordDeserializer != null) { + DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); + if (result.isBufferConsumed()) { + currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); + currentRecordDeserializer = null; + } + + if (result.isFullRecord()) { + return
[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces URL: https://github.com/apache/flink/pull/8476#discussion_r286740477 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Input.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.AsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; + +import java.io.Closeable; + +/** + * Basic interface for inputs of stream operators. + */ +@Internal +public interface Input extends AsyncDataInput, Closeable { Review comment: > Generally speaking Closeable is better and preferred - it extends Autocloseable and provide stronger/easier to use contract of close() being idempotent. You are right. > Is there a reason why Input#close() can not be idempotent? No. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces URL: https://github.com/apache/flink/pull/8476#discussion_r286740477 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Input.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.AsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; + +import java.io.Closeable; + +/** + * Basic interface for inputs of stream operators. + */ +@Internal +public interface Input extends AsyncDataInput, Closeable { Review comment: No. You are right. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define the data types in Python
flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define the data types in Python URL: https://github.com/apache/flink/pull/8420#issuecomment-491604302 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❗ 3. Needs [attention] from. - Needs attention by @twalthr [PMC] * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define the data types in Python
sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define the data types in Python URL: https://github.com/apache/flink/pull/8420#issuecomment-495021535 @flinkbot approve-until architecture This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking
[ https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) reassigned FLINK-12536: - Assignee: Congxian Qiu(klion26) > Make BufferOrEventSequence#getNext() non-blocking > - > > Key: FLINK-12536 > URL: https://issues.apache.org/jira/browse/FLINK-12536 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Assignee: Congxian Qiu(klion26) >Priority: Major > > Currently it is non-blocking in case of credit-based flow control (default), > however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from > file. We might want to consider reimplementing it to be non blocking with > {{CompletableFuture isAvailable()}} method. > > Otherwise we will block mailbox processing for the duration of reading from > file - for example we will block processing time timers and potentially in > the future network flushes. > > This is not a high priority change, since it affects non-default > configuration option AND at the moment only processing time timers are > planned to be moved to the mailbox for 1.9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class
bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class URL: https://github.com/apache/flink/pull/8514#issuecomment-494985129 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
flinkbot commented on issue #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) URL: https://github.com/apache/flink/pull/8515#issuecomment-494984545 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-7167) Job can't set checkpoint meta directory to cover the system setting state.checkpoints.dir
[ https://issues.apache.org/jira/browse/FLINK-7167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7167: --- Assignee: (was: Bowen Li) > Job can't set checkpoint meta directory to cover the system setting > state.checkpoints.dir > -- > > Key: FLINK-7167 > URL: https://issues.apache.org/jira/browse/FLINK-7167 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.3.1 >Reporter: yuqi >Priority: Major > > If we want to recover a failed job use checkpoint, till now, as all job > checkpoint meta data are in the same directory and do not have specific > identification, we have to traverse all file in the directory to find the > data of this job. this is rather troublesome. so seting this configuration > in the job level is preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5809) Add registerNextWatermarkCallback() to TimerService
[ https://issues.apache.org/jira/browse/FLINK-5809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-5809: --- Assignee: (was: Bowen Li) > Add registerNextWatermarkCallback() to TimerService > --- > > Key: FLINK-5809 > URL: https://issues.apache.org/jira/browse/FLINK-5809 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Aljoscha Krettek >Priority: Major > > This new method is similar to {{registerEventTimeTimer()}} but instead of > specifying a certain time where the timer should fire this will simply fire > whenever a new watermark arrives. > This is, for example, useful when you want to buffer elements for sorting by > event time and always want to emit as soon as the operator receives a > watermark. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7954) sideoutput in async function
[ https://issues.apache.org/jira/browse/FLINK-7954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7954: --- Assignee: (was: Bowen Li) > sideoutput in async function > > > Key: FLINK-7954 > URL: https://issues.apache.org/jira/browse/FLINK-7954 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.2 > Environment: similar to FLINK-7635,adding support of sideoutput to > asynFunction >Reporter: Chen Qin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8969) Move TimerService into state backend
[ https://issues.apache.org/jira/browse/FLINK-8969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-8969: --- Assignee: (was: Bowen Li) > Move TimerService into state backend > > > Key: FLINK-8969 > URL: https://issues.apache.org/jira/browse/FLINK-8969 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Bowen Li >Priority: Major > > upon discussion with [~aljoscha]. More details need to be added here -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12243) Support generic table stats related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12243: - Summary: Support generic table stats related operations in HiveCatalog (was: Support table stats related operations in GenericHiveMetastoreCatalog) > Support generic table stats related operations in HiveCatalog > - > > Key: FLINK-12243 > URL: https://issues.apache.org/jira/browse/FLINK-12243 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support table stats related operations in GenericHiveMetastoreCatalog, which > implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tweise opened a new pull request #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
tweise opened a new pull request #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) URL: https://github.com/apache/flink/pull/8515 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12243) Support generic table stats related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-12243: Assignee: (was: Bowen Li) > Support generic table stats related operations in HiveCatalog > - > > Key: FLINK-12243 > URL: https://issues.apache.org/jira/browse/FLINK-12243 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support table stats related operations in GenericHiveMetastoreCatalog, which > implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12237) Support Hive table stats related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12237: - Summary: Support Hive table stats related operations in HiveCatalog (was: Support table stats related operations in HiveCatalog) > Support Hive table stats related operations in HiveCatalog > -- > > Key: FLINK-12237 > URL: https://issues.apache.org/jira/browse/FLINK-12237 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support table stats related operations in HiveCatalog, which implements > ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12237) Support table stats related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-12237: Assignee: (was: Bowen Li) > Support table stats related operations in HiveCatalog > - > > Key: FLINK-12237 > URL: https://issues.apache.org/jira/browse/FLINK-12237 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > Support table stats related operations in HiveCatalog, which implements > ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class
flinkbot commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class URL: https://github.com/apache/flink/pull/8514#issuecomment-494983478 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class
bowenli86 opened a new pull request #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class URL: https://github.com/apache/flink/pull/8514 ## What is the purpose of the change This PR supports alterations in catalogs to check existing object and new object are of the same class. Most of them currently don't, e.g. you can alter an existing generic table with a new hive table in GenericInMemoryCatalog. ## Brief change log - changed the following alteration API to check existing object and new object are of the same class - HiveCatalog: alterDatabase(), alterTable(), alterFunction() - GenericInMemoryCatalog: alterDatabase(), alterTable(), alterFunction(), alterPartition() ## Verifying this change This change added tests and can be verified as follows: added unit tests `testAlterDb_differentTypedDb()`, `testAlterTable_differentTypedTable()`, `testAlterFunction_differentTypedFunction()` in `CatalogTestBase`, and `testAlterPartition_differentTypedPartition()` in `GenericInMemoryCatalogTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xccui commented on issue #8372: [FLINK-12354] [table] Add REVERSE function for table/sql API
xccui commented on issue #8372: [FLINK-12354] [table] Add REVERSE function for table/sql API URL: https://github.com/apache/flink/pull/8372#issuecomment-494961184 Hi @zzchun, sorry for the late reply. I have a quick look at the PR. Overall, it looks good and I think the `REVERSE` function is eligible enough to be added. However, since the flink-table layer has been applying some major changes recently, maybe it's better to add the function after that. Best, Xingcan This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8507: [FLINK-12236][hive] Support Hive function in HiveCatalog
bowenli86 commented on issue #8507: [FLINK-12236][hive] Support Hive function in HiveCatalog URL: https://github.com/apache/flink/pull/8507#issuecomment-494911693 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r286602063 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -740,10 +745,13 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(newPartition, "New partition cannot be null"); + checkArgument(newPartition instanceof HiveCatalogPartition, "Currently only supports HiveCatalogPartition"); Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r286601953 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -639,8 +640,12 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partition checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(partition, "Partition cannot be null"); + checkArgument(partition instanceof HiveCatalogPartition, "Currently only supports HiveCatalogPartition"); Review comment: We currently throw `CatalogException` if the type doesn't match. `checkArgument()` will throw `IllegalArgumentException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r286608694 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -616,44 +619,279 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl // -- partitions -- @Override - public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) - throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { - throw new UnsupportedOperationException(); + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + try { + return getHivePartition(tablePath, partitionSpec) != null; + } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e); + } } @Override - public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + checkNotNull(partition, "Partition cannot be null"); + + checkArgument(partition instanceof HiveCatalogPartition, "Currently only supports HiveCatalogPartition"); + + Table hiveTable = getHiveTable(tablePath); + + ensureTableAndPartitionMatch(hiveTable, partition); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to create partition %s of table %s", partitionSpec, tablePath)); + } } @Override - public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + try { + Table hiveTable = getHiveTable(tablePath); + client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath), true); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e); + } + } catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath)); +
[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r286602271 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -770,6 +778,16 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS } } + // make sure both table and partition are generic, or neither is + private void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) { + boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC)); + if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) || + (!isGeneric && catalogPartition instanceof GenericCatalogPartition)) { + throw new IllegalArgumentException(String.format("Cannot handle %s partition for %s table", Review comment: throw `CatalogException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] theoDiefenthal commented on issue #8513: [FLINK-12147] [metrics] Error for monitor kafka metrics when Use influxDB metr…
theoDiefenthal commented on issue #8513: [FLINK-12147] [metrics] Error for monitor kafka metrics when Use influxDB metr… URL: https://github.com/apache/flink/pull/8513#issuecomment-494907147 IMHO the correct place for this is here as InfluxDB is most widely used for monitoring, but not exclusively. I personally worked with InfluxDB for another purpose where I wanted those errors to arise. But one could of course avoid the roundtrip by letting the InfluxDB client directly throw such an exception... Further, this still is a workaround as InfluxDB dev team already tracks an issue to to support infinity values in future. [ https://github.com/influxdata/influxdb/issues/4089 , https://github.com/influxdata/flux/issues/1040 ]. The InfluxDB client should thus not be limited in terms of not sending the numbers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tweise merged pull request #8469: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable
tweise merged pull request #8469: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable URL: https://github.com/apache/flink/pull/8469 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r286601919 ## File path: flink-ml/pom.xml ## @@ -0,0 +1,39 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + + + org.apache.flink + flink-parent + 1.9-SNAPSHOT + .. + + + flink-ml Review comment: +1 to remove if no response in the user-ml. thanks for taking care of this @shaoxuan-wang . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r286601213 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.WithParams; +import org.apache.flink.ml.api.misc.persist.Persistable; +import org.apache.flink.ml.util.param.ExtractParamInfosUtil; +import org.apache.flink.ml.util.persist.MLStageFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base class for a stage in a pipeline. The interface is only a concept, and does not have any + * actual functionality. Its subclasses must be either Estimator or Transformer. No other classes + * should inherit this interface directly. + * + * Each pipeline stage is with parameters and meanwhile persistable. + * + * @param The class type of the PipelineStage implementation itself, used by {@link + *org.apache.flink.ml.api.misc.param.WithParams} + * @see WithParams + */ +interface PipelineStage> extends WithParams, Serializable, + Persistable { + + default String toJson() { Review comment: +1 to expose more extendable interface in the next PR. I am still not fully convince that "toJson" addresses all of the concerns that we raise, especially once @blublinsky raised. but I also have a strong feeling that this can be address in future PR. One fundamental supporting factor I am on the "toJson" interface is that: a readable/editable format where a model can be bi-directionally converted in and out of FlinkML is 100% a need in multiple scenarios. for this PR purpose I don't think we need to focus on addressing "all" scenarios. what do you guys think? IMO the follow up can be on "export", "serving", or even "standardizing" the model This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol merged pull request #8414: [FLINK-12447][Build] Enforce maven version 3.1.1
zentol merged pull request #8414: [FLINK-12447][Build] Enforce maven version 3.1.1 URL: https://github.com/apache/flink/pull/8414 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol merged pull request #8512: [FLINK-12447] [cron-master-maven_compat] Bump required Maven version to 3.1.1 (from 3.0.3)
zentol merged pull request #8512: [FLINK-12447] [cron-master-maven_compat] Bump required Maven version to 3.1.1 (from 3.0.3) URL: https://github.com/apache/flink/pull/8512 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8501: [FLINK-12254][table] Update TableSchema to new type system
bowenli86 commented on issue #8501: [FLINK-12254][table] Update TableSchema to new type system URL: https://github.com/apache/flink/pull/8501#issuecomment-494892462 Hive part LGTM. I'll have a followup PR to switch Hive type conversions to new Flink type system This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases
[ https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise resolved FLINK-12539. -- Resolution: Fixed Fix Version/s: 1.9.0 > StreamingFileSink: Make the class extendable to customize for different > usecases > > > Key: FLINK-12539 > URL: https://issues.apache.org/jira/browse/FLINK-12539 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Assignee: Kailash Hassan Dayanand >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently the StreamingFileSink has Builder pattern and the actual > constructor of StreamingFileSink is private. This makes it hard to extend the > class to built on top of this and customize the sink. (Example: Adding new > metrics). Proposing to make this protected as well as protected for the > Builder interface. > > Discussion is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)
[ https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12447: - Fix Version/s: 1.9.0 > Bump required Maven version to 3.1.1 (from 3.0.3) > - > > Key: FLINK-12447 > URL: https://issues.apache.org/jira/browse/FLINK-12447 > Project: Flink > Issue Type: Task > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > See > https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E > > The frontend-maven-plugin requires at least Maven 3.1.0. > I propose to bump the required Maven version to 3.1.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)
[ https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12447: - Issue Type: Improvement (was: Task) > Bump required Maven version to 3.1.1 (from 3.0.3) > - > > Key: FLINK-12447 > URL: https://issues.apache.org/jira/browse/FLINK-12447 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.9.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > See > https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E > > The frontend-maven-plugin requires at least Maven 3.1.0. > I propose to bump the required Maven version to 3.1.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)
[ https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846077#comment-16846077 ] Chesnay Schepler commented on FLINK-12447: -- maven compatibility branch updated in 1a5fdbae00aafa8b7fbcab6da687f173db60797a master: f652bb5a6ff3855ccf0038a5c0469896f41df908 > Bump required Maven version to 3.1.1 (from 3.0.3) > - > > Key: FLINK-12447 > URL: https://issues.apache.org/jira/browse/FLINK-12447 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.9.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > See > https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E > > The frontend-maven-plugin requires at least Maven 3.1.0. > I propose to bump the required Maven version to 3.1.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)
[ https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12447: - Affects Version/s: 1.9.0 > Bump required Maven version to 3.1.1 (from 3.0.3) > - > > Key: FLINK-12447 > URL: https://issues.apache.org/jira/browse/FLINK-12447 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.9.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > See > https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E > > The frontend-maven-plugin requires at least Maven 3.1.0. > I propose to bump the required Maven version to 3.1.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels
[ https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi reassigned FLINK-12576: -- Assignee: aitozi > inputQueueLength metric does not work for LocalInputChannels > > > Key: FLINK-12576 > URL: https://issues.apache.org/jira/browse/FLINK-12576 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Piotr Nowojski >Assignee: aitozi >Priority: Major > > Currently {{inputQueueLength}} ignores LocalInputChannels > ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes > when looking for causes of back pressure (If task is back pressuring whole > Flink job, but there is a data skew and only local input channels are being > used). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] c4emmmm opened a new pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
c4e opened a new pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402 ## What is the purpose of the change This pull request introduces the major interfaces for ML pipeline and ML lib. ## Brief change log - Create flink-ml module under flink root - Add flink-ml-api module, and introduce the major API interfaces, including PipelineStage, Estimator, Transformer, Model, Pipeline, etc. ## Verifying this change This change is pure interface design without any test coverage. This PR only adds ML pipeline & ML lib interface. The implementations on these interface will be added in the next PR (please refer to “implementation plan” section of [FLIP-39 doc](https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo) where we will add the test cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? - JavaDocs - Flink document (will submit via a separate PR) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shaoxuan-wang commented on issue #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
shaoxuan-wang commented on issue #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#issuecomment-494873716 Thanks for the contribution, @c4e. The entire design looks good to me. I have gone over all the comments. It seems most of them have been addressed. There is one major comment about the way to persist and reload the mode. I noticed that you have responded to the reviewers, while they have not yet confirmed if they are happy with your solution or not. In general I am +1 on this PR. Will merge it in the next 1-2 days if there is no further comments/concerns coming out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shaoxuan-wang closed pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
shaoxuan-wang closed pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r286569876 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.WithParams; +import org.apache.flink.ml.api.misc.persist.Persistable; +import org.apache.flink.ml.util.param.ExtractParamInfosUtil; +import org.apache.flink.ml.util.persist.MLStageFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base class for a stage in a pipeline. The interface is only a concept, and does not have any + * actual functionality. Its subclasses must be either Estimator or Transformer. No other classes + * should inherit this interface directly. + * + * Each pipeline stage is with parameters and meanwhile persistable. + * + * @param The class type of the PipelineStage implementation itself, used by {@link + *org.apache.flink.ml.api.misc.param.WithParams} + * @see WithParams + */ +interface PipelineStage> extends WithParams, Serializable, + Persistable { + + default String toJson() { Review comment: Thanks for all the comments. It triggers a broad and deep thinking on this interface and the future plan to export/load the model in different formats. I have offline discussed this with @c4e. The final plan looks good to me. @blublinsky, @walterddr, @ex00 , please let us know if you have any further question on the solution proposed by @c4e. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286541806 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception { if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { - IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); - - final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); - - if (singleInputGate != null) { - // Run asynchronously because it might be blocking - getRpcService().execute( - () -> { - try { - singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); - } catch (IOException | InterruptedException e) { - log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); - - try { - task.failExternally(e); - } catch (RuntimeException re) { - // TODO: Check whether we need this or make exception in failExtenally checked - log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); - } - } - }); - } else { - return FutureUtils.completedExceptionally( - new PartitionException("No reader with ID " + intermediateResultPartitionID + - " for task " + executionAttemptID + " was found.")); - } + // Run asynchronously because it might be blocking + getRpcService().execute( + () -> { + try { + networkEnvironment.updatePartitionInfo(partitionInfo); + } catch (Throwable t) { Review comment: I think we should not catch `Throwable` here. If an unchecked exception occurs, it should simply bubble up and cause the component to fail. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286540299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -441,6 +444,7 @@ public void close() throws IOException { finally { isReleased = true; Review comment: I think we could remove this field because we have now the `closeFuture` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286563093 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1221,53 +1214,46 @@ public void run() { */ @VisibleForTesting void onPartitionStateUpdate( - IntermediateDataSetID intermediateDataSetId, ResultPartitionID resultPartitionId, - ExecutionState producerState) throws IOException, InterruptedException { + ExecutionState producerState, + CompletableFuture producerReadyFuture) { Review comment: Instead of passing in this future, I would return a boolean indicating whether the producer is ready or not This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286560412 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck( } else { failExternally(throwable); } - } catch (IOException | InterruptedException e) { - failExternally(e); + } catch (Throwable t) { + failExternally(stripCompletionException(t)); } }, executor); + + return producerReadyFuture; Review comment: What about the following alternative: ``` final CompletableFuture producerReadyFuture = new CompletableFuture<>(); FutureUtils.assertNoException( futurePartitionState.whenCompleteAsync( (ExecutionState executionState, Throwable throwable) -> { if (executionState != null || throwable instanceof TimeoutException) { final boolean producerReady = onPartitionStateUpdate( resultPartitionId, executionState != null ? executionState : ExecutionState.RUNNING); producerReadyFuture.complete(producerReady); } else { if (throwable instanceof PartitionProducerDisposedException) { String msg = String.format("Producer %s of partition %s disposed. Cancelling execution.", resultPartitionId.getProducerId(), resultPartitionId.getPartitionId()); LOG.info(msg, throwable); cancelExecution(); } else { failExternally(throwable); } producerReadyFuture.complete(false); } }, executor)); return producerReadyFuture; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286551026 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck( } else { failExternally(throwable); } - } catch (IOException | InterruptedException e) { - failExternally(e); + } catch (Throwable t) { Review comment: We should not catch `Throwable` because it could be a legitimate Flink problem which should make us to fail the process. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286565922 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1221,53 +1214,46 @@ public void run() { */ @VisibleForTesting void onPartitionStateUpdate( Review comment: Maybe rename into `isProducerReadyAndHandlePartitionStateUpdate` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286546519 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception { if (task != null) { for (final PartitionInfo partitionInfo: partitionInfos) { - IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID(); - - final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID); - - if (singleInputGate != null) { - // Run asynchronously because it might be blocking - getRpcService().execute( - () -> { - try { - singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor()); - } catch (IOException | InterruptedException e) { - log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); - - try { - task.failExternally(e); - } catch (RuntimeException re) { - // TODO: Check whether we need this or make exception in failExtenally checked - log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re); - } - } - }); - } else { - return FutureUtils.completedExceptionally( - new PartitionException("No reader with ID " + intermediateResultPartitionID + - " for task " + executionAttemptID + " was found.")); - } + // Run asynchronously because it might be blocking + getRpcService().execute( + () -> { + try { + networkEnvironment.updatePartitionInfo(partitionInfo); + } catch (Throwable t) { + log.error( + "Could not update input data location for task {}. Trying to fail task.", + task.getTaskInfo().getTaskName(), + t); + FutureUtils.assertNoException(CompletableFuture.runAsync( + () -> task.failExternally(t), + getRpcService().getExecutor())); + } + }); Review comment: I would suggest to exchange this block with: ``` FutureUtils.assertNoException( CompletableFuture.runAsync( () -> { try { networkEnvironment.updatePartitionInfo(partitionInfo); } catch (IOException | InterruptedException e) { log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e); task.failExternally(e); } }, getRpcService().getExecutor())); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#discussion_r286568353 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -97,13 +104,15 @@ private NetworkEnvironment( NetworkBufferPool networkBufferPool, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, + Map inputGatesById, Review comment: Passing the map for storing the input gates into `NetworkEnvironment` exposes in my opinion a bit too many implementation details of this structure. I basically says that the object needs to store the `SingleInputGates` in this structure. Otherwise the test will fail. I think it would be better to have a `getInputGate(IntermediateDataSetID)` and let the way the gates are stored internally be an implementation detail. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r286564448 ## File path: flink-ml/pom.xml ## @@ -0,0 +1,39 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + + + org.apache.flink + flink-parent + 1.9-SNAPSHOT + .. + + + flink-ml Review comment: I started a [discussion](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html) in dev/user-ml to collect the feedback. @zentol pointed out that we can just remove the legacy flink-ml package, we do not expect any further active development on this package. I think he is right. I will create another PR and delete it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services