[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286778051
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   I know what you mean, please see this [discussion 
section](https://github.com/apache/flink/pull/8322#discussion_r280328078). 
Currently, the counting mechanism is not based on checkpoint order and based on 
execution order (execution phase). It's a temporal solution, we will give a 
better solution in the future. Its jira issue id is FLINK-12514.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286778051
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   I know what you mean, please see this [discussion 
section](https://github.com/apache/flink/pull/8322#discussion_r280328078). 
Currently, the counting mechanism is not based on checkpoint order. It's a 
temporal solution, we will give a better solution in the future. Its jira issue 
id is FLINK-12514.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-22 Thread Kazunori Shinhira (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846425#comment-16846425
 ] 

Kazunori Shinhira commented on FLINK-12586:
---

Thank you for your code [~fan_li_ya].

It is exactly what I expected.

It looks good to me.

> Stderr and stdout are reversed in OptimizerPlanEnvironment
> --
>
> Key: FLINK-12586
> URL: https://issues.apache.org/jira/browse/FLINK-12586
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Kazunori Shinhira
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that 
> stdout is output as System.err and stderr is output as System.out.
> [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108]
>  
> I think, It should be like as bellow.
> {code:java}
> throw new ProgramInvocationException(
> "The program plan could not be fetched - the program aborted pre-maturely."
> + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr)
> + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286776553
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   For concurrent checkpoints, such as chk-1 and chk-2, we may get chk-2 
success and then receive chk-1 failure (add a test now and may change the 
expected result after the change of 12514), in my opinion, we should add a test 
to guarantee the result is what we want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286775875
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -202,31 +201,33 @@ public CheckpointCoordinator(
StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory,
-   boolean isPreferCheckpointForRecovery) {
+   CheckpointFailureManager failureManager) {
 
// sanity checks
checkNotNull(checkpointStateBackend);
-   checkArgument(baseInterval > 0, "Checkpoint base interval must 
be larger than zero");
-   checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
-   checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
-   checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
+   checkArgument(chkConfig.getCheckpointInterval() > 0, 
"Checkpoint base interval must be larger than zero");
 
 Review comment:
   The check here can also merge into `CheckpointCoordinatorConfiguration` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-22 Thread GitBox
flinkbot commented on issue #8517: [FLINK-10921] [kinesis] Shard watermark 
synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517#issuecomment-495066880
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-495066930
 
 
   @klion26 Thanks for your review suggestion. What do you think about the new 
change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10921) Prioritize shard consumers in Kinesis Consumer by event time

2019-05-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10921:
---
Labels: pull-request-available  (was: )

> Prioritize shard consumers in Kinesis Consumer by event time 
> -
>
> Key: FLINK-10921
> URL: https://issues.apache.org/jira/browse/FLINK-10921
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> Shard consumer threads currently emit records directly. In order to align 
> shards by event time, decouple shard consumer threads and emitter with a 
> queue, as described in [1].
> [1] 
> https://lists.apache.org/thread.html/ac41718246ad8f6098efaf7dbf5f7182d60abdc473e8bf3c96ef5968@%3Cdev.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tweise opened a new pull request #8517: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-22 Thread GitBox
tweise opened a new pull request #8517: [FLINK-10921] [kinesis] Shard watermark 
synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8517
 
 
   ## What is the purpose of the change
   
   * This pull request makes adds support for source synchronization to the 
Kinesis consumer. Source synchronization aligns shard consumption based on the 
per-shard watermark.
   
   ## Brief change log
   
   - Adds global watermark aggregation to track the current low watermark 
across subtasks
   - Adds record emitter to prioritize per-shard queues and align the shard 
consumers through back pressure
   - Above components are designed to be reusable for the upcoming support in 
the Kafka consumer.
   - Integration into KinesisDataFetcher 
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - This is a back port from Lyft internal codebase where it has been tested 
extensively and at scale.
   - PR includes test for consumer and basic tests for emitter and watermark 
tracker.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (user documentation will be added 
separately)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286774079
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   I think it's not necessary. The counter is `AtomicInteger` and we tolerance 
concurrent checkpoint and concurrent counting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286773309
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -512,24 +513,19 @@ public boolean isArchived() {
}
 
public void enableCheckpointing(
-   long interval,
-   long checkpointTimeout,
-   long minPauseBetweenCheckpoints,
-   int maxConcurrentCheckpoints,
-   CheckpointRetentionPolicy retentionPolicy,
+   CheckpointCoordinatorConfiguration chkConfig,
List verticesToTrigger,
List verticesToWaitFor,
List verticesToCommitTo,
List> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
-   CheckpointStatsTracker statsTracker,
-   boolean isPreferCheckpointForRecovery) {
+   CheckpointStatsTracker statsTracker) {
 
// simple sanity checks
-   checkArgument(interval >= 10, "checkpoint interval must not be 
below 10ms");
-   checkArgument(checkpointTimeout >= 10, "checkpoint timeout must 
not be below 10ms");
+   checkArgument(chkConfig.getCheckpointInterval() >= 10, 
"checkpoint interval must not be below 10ms");
 
 Review comment:
   Actually, the interval in many places is not united. In CheckpointConfig it 
used -1 as a disabled flag and the value which large than 0 is legal. So in 
CheckpointCoordinatorConfiguration the sanity check based on 1. It's better to 
unify them. Either -1 or larger than or equal to 10. So agree with you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive

2019-05-22 Thread GitBox
liyafan82 commented on issue #8511: [FLINK-12319][Library/CEP]Change the logic 
of releasing node from recursive to non-recursive
URL: https://github.com/apache/flink/pull/8511#issuecomment-495061840
 
 
   > Thank you for fixing this bug.
   > Does it make sense to add the job triggering this bug as a test, so that 
we won't introduce it in the future again?
   
   @rmetzger thanks a lot for the good suggestion. We have added a test case 
for that. Please help take a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286768669
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -512,24 +513,19 @@ public boolean isArchived() {
}
 
public void enableCheckpointing(
-   long interval,
-   long checkpointTimeout,
-   long minPauseBetweenCheckpoints,
-   int maxConcurrentCheckpoints,
-   CheckpointRetentionPolicy retentionPolicy,
+   CheckpointCoordinatorConfiguration chkConfig,
List verticesToTrigger,
List verticesToWaitFor,
List verticesToCommitTo,
List> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
-   CheckpointStatsTracker statsTracker,
-   boolean isPreferCheckpointForRecovery) {
+   CheckpointStatsTracker statsTracker) {
 
// simple sanity checks
-   checkArgument(interval >= 10, "checkpoint interval must not be 
below 10ms");
-   checkArgument(checkpointTimeout >= 10, "checkpoint timeout must 
not be below 10ms");
+   checkArgument(chkConfig.getCheckpointInterval() >= 10, 
"checkpoint interval must not be below 10ms");
 
 Review comment:
   As we have `CheckpointCoordinatorConfiguration` could we just place all the 
checks in the constructor of `CheckpointCoordinatorConfiguration`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286770672
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the checkpoint failure manager.
+ */
+public class CheckpointFailureManagerTest extends TestLogger {
+
+   @Test
+   public void testContinuousFailure() {
 
 Review comment:
   Do we need to add a test case for concurrent checkpoint?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12551) elasticsearch6 connector print log error

2019-05-22 Thread sunxiongkun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunxiongkun closed FLINK-12551.
---
Resolution: Not A Bug

> elasticsearch6 connector print log error
> 
>
> Key: FLINK-12551
> URL: https://issues.apache.org/jira/browse/FLINK-12551
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.6.3
>Reporter: sunxiongkun
>Priority: Minor
>
> when i use elasticsearch connector ,when my project is running,i find some 
> data does not insert elasticsearch ,so i want to read log help me ,but the 
> log does contain importance message,so i read source code 
> (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i 
> find a error on write ERROR log.
>  
> {code:java}
> @Override
> public void afterBulk(long executionId, BulkRequest request, BulkResponse 
> response) {
>  if (response.hasFailures()) {
>   BulkItemResponse itemResponse;
>   Throwable failure;
>   RestStatus restStatus;
>   try {
>for (int i = 0; i < response.getItems().length; i++) {
> itemResponse = response.getItems()[i];
> failure = 
> callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
> if (failure != null) {
>  LOG.error("Failed Elasticsearch item request: {}", 
> itemResponse.getFailureMessage(), failure);
>  restStatus = itemResponse.getFailure().getStatus();
>  if (restStatus == null) {
>   failureHandler.onFailure(request.requests().get(i), failure, -1, 
> requestIndexer);
>  } else {
>   failureHandler.onFailure(request.requests().get(i), failure, 
> restStatus.getStatus(), requestIndexer);
>  }
> }
>}
>   } catch (Throwable t) {
>// fail the sink and skip the rest of the items
>// if the failure handler decides to throw an exception
>failureThrowable.compareAndSet(null, t);
>   }
>  }
>  if (flushOnCheckpoint) {
>   numPendingRequests.getAndAdd(-request.numberOfActions());
>  }
> }
> {code}
> {code:java}
> @Override
>  public void afterBulk(long executionId, BulkRequest request, Throwable 
> failure) {
>   LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), 
> failure.getCause());
>   try {
>for (ActionRequest action : request.requests()) {
> failureHandler.onFailure(action, failure, -1, requestIndexer);
>}
>   } catch (Throwable t) {
>// fail the sink and skip the rest of the items
>// if the failure handler decides to throw an exception
>failureThrowable.compareAndSet(null, t);
>   }
>   if (flushOnCheckpoint) {
>numPendingRequests.getAndAdd(-request.numberOfActions());
>   }
>  }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-22 Thread GitBox
klion26 commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r286768901
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 ##
 @@ -63,11 +65,13 @@ public CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
-   boolean isPerfetCheckpointForRecovery) {
+   boolean isPerfetCheckpointForRecovery,
+   int tolerableCpFailureNumber) {
 
// sanity checks
if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
+   minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1 ||
 
 Review comment:
   Why do we need `tolerableCpFailureNumber > Integer.MAX_VALUE`, I think an 
int can never bigger than `Integer.MAX_VALUE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog

2019-05-22 Thread GitBox
bowenli86 commented on issue #8449: [FLINK-12235][hive] Support Hive partition 
in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#issuecomment-495060568
 
 
   ok, let's leave that part as it for now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12582) Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12582.

Resolution: Fixed

merged in 1.9.0: aca9c018a4b26e50333c33426e77323cbb9b7960

> Alteration APIs in catalogs should check existing object and new object are 
> of the same class
> -
>
> Key: FLINK-12582
> URL: https://issues.apache.org/jira/browse/FLINK-12582
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Alterations in catalogs should check existing object and new object are of 
> the same class. 
> Most of them currently don't, e.g. you can alter an existing generic table 
> with a new hive table in GenericInMemoryCatalog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread GitBox
asfgit closed pull request #8514: [FLINK-12582][table][hive] Alteration APIs in 
catalogs should check existing object and new object are of the same class
URL: https://github.com/apache/flink/pull/8514
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread GitBox
bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs 
in catalogs should check existing object and new object are of the same class
URL: https://github.com/apache/flink/pull/8514#issuecomment-495058160
 
 
   thanks @xuefuz , merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846399#comment-16846399
 ] 

Liya Fan commented on FLINK-12593:
--

Hi [~jark], thank you so much for sharing the good news and the documents.

I will take a closer look at the documents. However, I guess I also need to get 
more knowledge about CEP, before I can give some useful comments about it :)

> Revise the document for CEP
> ---
>
> Key: FLINK-12593
> URL: https://issues.apache.org/jira/browse/FLINK-12593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Liya Fan
>Priority: Minor
>
> The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
> understand and follow, especially for beginners.
> I suggest revising from the following aspects:
> 1. Give more detailed descriptions of existing examples.
> 2. More examples are required to illustrate the features.
> 3. More explanations are required for some concepts, like contiguity.
> 4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type

2019-05-22 Thread GitBox
zhijiangW commented on a change in pull request #8346: [FLINK-12405] [DataSet] 
Introduce BLOCKING_PERSISTENT result partition type
URL: https://github.com/apache/flink/pull/8346#discussion_r286761364
 
 

 ##
 File path: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ##
 @@ -480,6 +482,32 @@ public void postVisit(PlanNode node) {
if (node instanceof SourcePlanNode || node instanceof 
NAryUnionPlanNode || node instanceof SolutionSetPlanNode) {
return;
}
+
+   // if this is a blocking shuffle vertex, we add one 
IntermediateDataSetID to its predecessor and return
+   if (node instanceof SinkPlanNode) {
+   Object userCodeObject = 
node.getProgramOperator().getUserCodeWrapper().getUserCodeObject();
+   if (userCodeObject instanceof 
BlockingShuffleOutputFormat) {
+   PlanNode precedentNode = 
node.getInputs().iterator().next().getSource();
+   JobVertex precedentVertex;
+   if 
(this.vertices.containsKey(precedentNode)) {
 
 Review comment:
   Yes, we have some history inconsistent issues. But it should not bring that 
in new codes. For the history codes we might solve in a separate hotfix or just 
keep that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog

2019-05-22 Thread GitBox
lirui-apache commented on issue #8449: [FLINK-12235][hive] Support Hive 
partition in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#issuecomment-495051154
 
 
   > @lirui-apache Thank you very much for the update!
   > 
   > I just spotted that, currently how several APIs impl work like this: 1) 
get a raw hive table 2) parse part of the raw table. The latter step actually 
duplicate with logic in `instantiateHiveCatalogTable()`. E.g. 
`ensureTableAndPartitionMatch()` parses `FLINK_PROPERTY_IS_GENERIC`, 
`instantiateHivePartition()` parses partition keys, `ensurePartitionedTable()` 
parses the raw table's partition key size, all of which we can get by just 
parsing the raw table to a `CatalogTable` thru `instantiateHiveCatalogTable()` 
in advance. The current duplication also means if we change some general logic 
in parsing a hive table, we need to change two places. Thus I wonder if it 
makes sense to just parse the raw table as whole at the beginning rather than 
having scattered places each parsing only part of it themselves. And we can 
remove util methods such as `getFieldNames()` which is only used to get the 
partition keys which is already available in `CatalogTable`.
   > 
   > For example, change
   > 
   > ```
   > public void createPartition(...) {
   >   Table hiveTable = getHiveTable(tablePath);
   >   ensureTableAndPartitionMatch(hiveTable, partition);
   >   ensurePartitionedTable(tablePath, hiveTable);
   >   try {
   > client.add_partition(instantiateHivePartition(hiveTable, 
partitionSpec, partition));
   >   } ...
   > }
   > ```
   > 
   > to something like:
   > 
   > ```
   > public void createPartition(...) {
   >   Table hiveTable = getHiveTable(tablePath);
   >   CatalogBaseTable catalogTable = instantiateHiveCatalogTable(hiveTable);
   >   ... check whether catalogTabe and catalogPartition type matches would be 
much easier here ...
   >   ... check whether catalogTable is partitioned would be easier here ...
   >   try {
   > client.add_partition(
   >   instantiateHivePartition(catalogTable, partitionSpec, partition, 
hiveTable.getSd()));
   >   } ...
   > }
   > ```
   
   I'm not sure how much benefit this can bring us. It might make 
`ensureTableAndPartitionMatch` a little easier -- we can check the type of 
CatalogBaseTable instead of parsing a property. But I don't think we should 
take the same approach for `ensurePartitionedTable`. `ensurePartitionedTable` 
is already simple enough. And for APIs like `listPartitions`, creating a 
CatalogBaseTable just to get num of partition cols seems an overkill to me. And 
the same applies to `getFieldNames`. E.g. I think it'll be an overkill to 
create a CatalogBaseTable to get partition col names in `dropPartition`.
   Maybe an alternative approach to the problem you mentioned is to have more 
util methods, in order to avoid duplication. For example, we should have a util 
method to decide whether a Hive table is generic. And all the APIs needing this 
logic can call the util method. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846393#comment-16846393
 ] 

Jark Wu edited comment on FLINK-12593 at 5/23/19 3:02 AM:
--

Hi [~fan_li_ya] thanks a lot for the ideas. The community is also working on 
improving documentation and started a discussion[1] about this two days ago. 

It would be great if you can leave some thoughts and comments regarding to the 
CEP restructure in the design doc[2] there. So that we can work together to 
improve documentation.

[1]. 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-to-Restructure-Update-amp-Rework-Apache-Flink-s-Documentation-tt29014.html
[2]. 
https://docs.google.com/document/d/1pPM4vTWUUiJb73pd8OqHA1EWVl9cv6kXoCB5F7_J6gM/edit


was (Author: jark):
Hi [~fan_li_ya] thanks a lot for the ideas. The community is also working on 
improve documentation and started a discussion[1] about this two days ago. 

It would be great if you can leave some thoughts and comments regarding to the 
CEP restructure in the design doc[2] there. So that we can work together to 
improve documentation.

[1]. 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-to-Restructure-Update-amp-Rework-Apache-Flink-s-Documentation-tt29014.html
[2]. 
https://docs.google.com/document/d/1pPM4vTWUUiJb73pd8OqHA1EWVl9cv6kXoCB5F7_J6gM/edit

> Revise the document for CEP
> ---
>
> Key: FLINK-12593
> URL: https://issues.apache.org/jira/browse/FLINK-12593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Liya Fan
>Priority: Minor
>
> The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
> understand and follow, especially for beginners.
> I suggest revising from the following aspects:
> 1. Give more detailed descriptions of existing examples.
> 2. More examples are required to illustrate the features.
> 3. More explanations are required for some concepts, like contiguity.
> 4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846393#comment-16846393
 ] 

Jark Wu commented on FLINK-12593:
-

Hi [~fan_li_ya] thanks a lot for the ideas. The community is also working on 
improve documentation and started a discussion[1] about this two days ago. 

It would be great if you can leave some thoughts and comments regarding to the 
CEP restructure in the design doc[2] there. So that we can work together to 
improve documentation.

[1]. 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-to-Restructure-Update-amp-Rework-Apache-Flink-s-Documentation-tt29014.html
[2]. 
https://docs.google.com/document/d/1pPM4vTWUUiJb73pd8OqHA1EWVl9cv6kXoCB5F7_J6gM/edit

> Revise the document for CEP
> ---
>
> Key: FLINK-12593
> URL: https://issues.apache.org/jira/browse/FLINK-12593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Liya Fan
>Priority: Minor
>
> The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
> understand and follow, especially for beginners.
> I suggest revising from the following aspects:
> 1. Give more detailed descriptions of existing examples.
> 2. More examples are required to illustrate the features.
> 3. More explanations are required for some concepts, like contiguity.
> 4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-12593:
---

Assignee: (was: Jark Wu)

> Revise the document for CEP
> ---
>
> Key: FLINK-12593
> URL: https://issues.apache.org/jira/browse/FLINK-12593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Liya Fan
>Priority: Minor
>
> The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
> understand and follow, especially for beginners.
> I suggest revising from the following aspects:
> 1. Give more detailed descriptions of existing examples.
> 2. More examples are required to illustrate the features.
> 3. More explanations are required for some concepts, like contiguity.
> 4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-12593:
---

Assignee: Jark Wu

> Revise the document for CEP
> ---
>
> Key: FLINK-12593
> URL: https://issues.apache.org/jira/browse/FLINK-12593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Liya Fan
>Assignee: Jark Wu
>Priority: Minor
>
> The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
> understand and follow, especially for beginners.
> I suggest revising from the following aspects:
> 1. Give more detailed descriptions of existing examples.
> 2. More examples are required to illustrate the features.
> 3. More explanations are required for some concepts, like contiguity.
> 4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12335) Improvement the code and performance of class SegmentsUtil

2019-05-22 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-12335:

Summary: Improvement the code and performance of class SegmentsUtil  (was: 
Remove useless code in SegmentsUtil)

> Improvement the code and performance of class SegmentsUtil
> --
>
> Key: FLINK-12335
> URL: https://issues.apache.org/jira/browse/FLINK-12335
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Improve the performance of class SegmentsUtil:
> To evaluate the offset, an integer is bitand with a mask to clear to low 
> bits, and then shift right. The bitand is useless:
> ((index & BIT_BYTE_POSITION_MASK) >>> 3)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12335) Remove useless code in SegmentsUtil

2019-05-22 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12335.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in 1.9.0: 11a96fdf213467595dad73cffd9b05134a4d0d75

> Remove useless code in SegmentsUtil
> ---
>
> Key: FLINK-12335
> URL: https://issues.apache.org/jira/browse/FLINK-12335
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Improve the performance of class SegmentsUtil:
> To evaluate the offset, an integer is bitand with a mask to clear to low 
> bits, and then shift right. The bitand is useless:
> ((index & BIT_BYTE_POSITION_MASK) >>> 3)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-22 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846381#comment-16846381
 ] 

Liya Fan commented on FLINK-12586:
--

Hi [~shinhira_kazunori], thanks for finding this problem. I have provided a 
fix. Please take a look.

> Stderr and stdout are reversed in OptimizerPlanEnvironment
> --
>
> Key: FLINK-12586
> URL: https://issues.apache.org/jira/browse/FLINK-12586
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Kazunori Shinhira
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that 
> stdout is output as System.err and stderr is output as System.out.
> [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108]
>  
> I think, It should be like as bellow.
> {code:java}
> throw new ProgramInvocationException(
> "The program plan could not be fetched - the program aborted pre-maturely."
> + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr)
> + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task

2019-05-22 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846382#comment-16846382
 ] 

Congxian Qiu(klion26) commented on FLINK-12296:
---

merged
 * master         ee60846dc588b1a832a497ff9522d7a3a282c350
 * release-1.8  531d727f9b32c310d8d63b253019b8cc4a23a3eb
 * release-1.7  1ce2efd7a38d091fc004a8dba034ece0bcc42385
 * release-1.6  0dda6fe9dff4f667b110cda39bfe9738ba615b24

> Data loss silently in RocksDBStateBackend when more than one operator(has 
> states) chained in a single task 
> ---
>
> Key: FLINK-12296
> URL: https://issues.apache.org/jira/browse/FLINK-12296
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As the mail list said[1], there may be a problem when more than one operator 
> chained in a single task, and all the operators have states, we'll encounter 
> data loss silently problem.
> Currently, the local directory we used is like below
> ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state),
>  
> if more than one operator chained in a single task, and all the operators 
> have states, then all the operators will share the same local 
> directory(because the vertext_id is the same), this will lead a data loss 
> problem. 
>  
> The path generation logic is below:
> {code:java}
> // LocalRecoveryDirectoryProviderImpl.java
> @Override
> public File subtaskSpecificCheckpointDirectory(long checkpointId) {
>return new File(subtaskBaseDirectory(checkpointId), 
> checkpointDirString(checkpointId));
> }
> @VisibleForTesting
> String subtaskDirString() {
>return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + 
> subtaskIndex).toString();
> }
> @VisibleForTesting
> String checkpointDirString(long checkpointId) {
>return "chk_" + checkpointId;
> }
> {code}
> [1] 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8516: [FLINK-12586][Command Line Client]Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-22 Thread GitBox
flinkbot commented on issue #8516: [FLINK-12586][Command Line Client]Stderr and 
stdout are reversed in OptimizerPlanEnvironment
URL: https://github.com/apache/flink/pull/8516#issuecomment-495045139
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12586:
---
Labels: pull-request-available  (was: )

> Stderr and stdout are reversed in OptimizerPlanEnvironment
> --
>
> Key: FLINK-12586
> URL: https://issues.apache.org/jira/browse/FLINK-12586
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Kazunori Shinhira
>Priority: Minor
>  Labels: pull-request-available
>
> In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that 
> stdout is output as System.err and stderr is output as System.out.
> [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108]
>  
> I think, It should be like as bellow.
> {code:java}
> throw new ProgramInvocationException(
> "The program plan could not be fetched - the program aborted pre-maturely."
> + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr)
> + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] liyafan82 opened a new pull request #8516: [FLINK-12586][Command Line Client]Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-22 Thread GitBox
liyafan82 opened a new pull request #8516: [FLINK-12586][Command Line 
Client]Stderr and stdout are reversed in OptimizerPlanEnvironment
URL: https://github.com/apache/flink/pull/8516
 
 
   
   
   
   ## What is the purpose of the change
   
   Fix issue 12586: In OptimizerPlanEnvironment#getOptimizedPlan method, it 
looks like that stdout is output as System.err and stderr is output as 
System.out.
   
   ## Brief change log
   
 - Change the method OptimizerPlanEnvironment#getOptimizedPlan
 
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12594) Make Job ID Configurable for a Job Cluster

2019-05-22 Thread Chaoran Yu (JIRA)
Chaoran Yu created FLINK-12594:
--

 Summary: Make Job ID Configurable for a Job Cluster
 Key: FLINK-12594
 URL: https://issues.apache.org/jira/browse/FLINK-12594
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.8.0
Reporter: Chaoran Yu


Currently the job ID is only configurable for a job in a session cluster. If a 
job is launched as a job cluster, the job ID is always the default value of 
 without an option to manually set it. This 
[thread|[http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAKiyyaE+wx72gAwgDbLuThZKJR8VDPUid0Q1S=qx91pDf3=z...@mail.gmail.com%3E]]
 in the mailing list also talked about the same issue. It should be made 
configurable in the job cluster case.

In the 
[documentation|[https://github.com/apache/flink/tree/master/flink-container/kubernetes]]
 for running a job cluster on Kubernetes, it's mentioned that the job ID is 
configurable via the `--job-id` flag, which is false because it's not 
implemented anywhere. The documentation should be fixed until this feature is 
implemented.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil

2019-05-22 Thread GitBox
asfgit closed pull request #8278: [FLINK-12335][Table-runtime]Remove useless 
code in class SegmentsUtil
URL: https://github.com/apache/flink/pull/8278
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil

2019-05-22 Thread GitBox
wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless 
code in class SegmentsUtil
URL: https://github.com/apache/flink/pull/8278#issuecomment-495042553
 
 
   LGTM. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil

2019-05-22 Thread GitBox
wuchong commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless 
code in class SegmentsUtil
URL: https://github.com/apache/flink/pull/8278#issuecomment-495042540
 
 
   LGTM. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil

2019-05-22 Thread GitBox
flinkbot edited a comment on issue #8278: [FLINK-12335][Table-runtime]Remove 
useless code in class SegmentsUtil
URL: https://github.com/apache/flink/pull/8278#issuecomment-487004244
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @wuchong [committer]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless code in class SegmentsUtil

2019-05-22 Thread GitBox
JingsongLi commented on issue #8278: [FLINK-12335][Table-runtime]Remove useless 
code in class SegmentsUtil
URL: https://github.com/apache/flink/pull/8278#issuecomment-495041136
 
 
   @flinkbot attention @wuchong 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5809) Add registerNextWatermarkCallback() to TimerService

2019-05-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846371#comment-16846371
 ] 

vinoyang commented on FLINK-5809:
-

It seems the {{registerEventTimeTimer}} does not exist in the current code 
base. So maybe we can close this issue now. cc [~aljoscha]

> Add registerNextWatermarkCallback() to TimerService
> ---
>
> Key: FLINK-5809
> URL: https://issues.apache.org/jira/browse/FLINK-5809
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Aljoscha Krettek
>Priority: Major
>
> This new method is similar to {{registerEventTimeTimer()}} but instead of 
> specifying a certain time where the timer should fire this will simply fire 
> whenever a new watermark arrives.
> This is, for example, useful when you want to buffer elements for sorting by 
> event time and always want to emit as soon as the operator receives a 
> watermark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12593) Revise the document for CEP

2019-05-22 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12593:


 Summary: Revise the document for CEP
 Key: FLINK-12593
 URL: https://issues.apache.org/jira/browse/FLINK-12593
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Liya Fan


The document for CEP (flink/docs/dev/libs/cep.md) can be difficult to 
understand and follow, especially for beginners.

I suggest revising from the following aspects:

1. Give more detailed descriptions of existing examples.

2. More examples are required to illustrate the features.

3. More explanations are required for some concepts, like contiguity.

4. We can add more references to better understand the concepts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type

2019-05-22 Thread GitBox
Xpray commented on issue #8346: [FLINK-12405] [DataSet] Introduce 
BLOCKING_PERSISTENT result partition type
URL: https://github.com/apache/flink/pull/8346#issuecomment-495040043
 
 
   @zhijiangW Thanks for you review and advice,I've updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8969) Move TimerService into state backend

2019-05-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846368#comment-16846368
 ] 

vinoyang commented on FLINK-8969:
-

Hi [~phoenixjiangnan] Does this proposal still make sense based on the current 
code base?

> Move TimerService into state backend
> 
>
> Key: FLINK-8969
> URL: https://issues.apache.org/jira/browse/FLINK-8969
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Bowen Li
>Priority: Major
>
> upon discussion with [~aljoscha]. More details need to be added here



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type

2019-05-22 Thread GitBox
Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] 
Introduce BLOCKING_PERSISTENT result partition type
URL: https://github.com/apache/flink/pull/8346#discussion_r286751650
 
 

 ##
 File path: 
flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
 ##
 @@ -265,6 +274,55 @@ public void testArtifactCompression() throws IOException {
assertState(nonExecutableDirEntry, false, true);
}
 
+   @Test
+   public void testGeneratingJobGraphWithUnconsumedResultPartition() {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> input = env.fromElements(new 
Tuple2<>(1L, 2L))
+   .setParallelism(1);
+
+   DataSet ds = input.map((MapFunction, 
Object>) value -> new Tuple2<>(value.f0 + 1, value.f1))
+   .setParallelism(3);
+
+   AbstractID intermediateDataSetID = new AbstractID();
+
+   // this output branch will be excluded.
+   
ds.output(BlockingShuffleOutputFormat.createOutputFormat(intermediateDataSetID))
+   .setParallelism(1);
+
+   // this is the normal output branch.
+   ds.output(new DiscardingOutputFormat())
+   .setParallelism(1);
+
+   Plan plan = env.createProgramPlan();
+   Optimizer pc = new Optimizer(new Configuration());
+   OptimizedPlan op = pc.compile(plan);
+
+   JobGraphGenerator jgg = new JobGraphGenerator();
+   JobGraph jobGraph = jgg.compileJobGraph(op);
+
+   Assert.assertEquals(3, 
jobGraph.getVerticesSortedTopologicallyFromSources().size());
+
+   JobVertex inputVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+   JobVertex mapVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+   JobVertex outputVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
+
+   Assert.assertTrue(outputVertex instanceof OutputFormatVertex);
 
 Review comment:
   > It is better to use `hamcrest` to verify `instanceof` here.
   
   Great, it seemes `hamcrest ` is more readable here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API

2019-05-22 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846359#comment-16846359
 ] 

Jark Wu commented on FLINK-12401:
-

Do you have a design about incremental emit [~hequn8128] ?  For example, what's 
the API looks like, when will be invoked, what is it used for?

> Support incremental emit for non-window streaming FlatAggregate on Table API
> 
>
> Key: FLINK-12401
> URL: https://issues.apache.org/jira/browse/FLINK-12401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As described in 
> [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739],
>  there are two output modes for non-window streaming flatAggregate. One is 
> emitting with full values, the other is emitting with incremental values. 
> [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the 
> former one, this jira is going to support the latter one. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-05-22 Thread Thomas Weise (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-12539:
-
Fix Version/s: 1.8.1

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tweise merged pull request #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)

2019-05-22 Thread GitBox
tweise merged pull request #8515: [FLINK-12539] [fs-connector] Make 
StreamingFileSink customizable (#8469)
URL: https://github.com/apache/flink/pull/8515
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces

2019-05-22 Thread GitBox
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] 
Introduce Input and NetworkInput interfaces
URL: https://github.com/apache/flink/pull/8476#discussion_r286742814
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ##
 @@ -156,9 +129,41 @@ public StreamInputProcessor(
}
 
public boolean processInput() throws Exception {
-   if (isFinished) {
-   return false;
+   initializeNumRecordsIn();
+
+   StreamElement recordOrMark = input.pollNext();
+   if (recordOrMark == null) {
+   input.isAvailable().get();
+   return input.isFinished();
}
+
+   processElement(recordOrMark);
+   return true;
+   }
+
+   private void processElement(StreamElement recordOrMark) throws 
Exception {
+   if (recordOrMark.isWatermark()) {
+   // handle watermark
+   
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
 
 Review comment:
   In fact, I care about that the input channel index is exposed to input 
processors. As long as it's not exposed to the input processor (for example, 
encapsulating a class as you said), it's fine to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces

2019-05-22 Thread GitBox
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] 
Introduce Input and NetworkInput interfaces
URL: https://github.com/apache/flink/pull/8476#discussion_r286741019
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/NetworkInput.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of taking {@link InputGate} as {@link Input}.
+ */
+@Internal
+public final class NetworkInput implements Input {
+
+   private final int numberOfInputChannels;
+
+   private final CheckpointBarrierHandler barrierHandler;
+
+   private final DeserializationDelegate 
deserializationDelegate;
+
+   private final 
RecordDeserializer>[] 
recordDeserializers;
+
+   /**
+* The channel from which a buffer came, tracked so that we can 
appropriately map
+* the watermarks and watermark statuses to the correct channel index 
of the correct valve.
+*/
+   private int currentChannel = -1;
+
+   private RecordDeserializer> 
currentRecordDeserializer = null;
+
+   private boolean isFinished = false;
+
+   @SuppressWarnings("unchecked")
+   public NetworkInput(
+   CheckpointBarrierHandler barrierHandler,
+   TypeSerializer inputSerializer,
+   IOManager ioManager) {
+   this.barrierHandler = barrierHandler;
+   this.numberOfInputChannels = 
barrierHandler.getNumberOfInputChannels();
+   this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(
+   new StreamElementSerializer<>(inputSerializer));
+
+   // Initialize one deserializer per input channel
+   this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[numberOfInputChannels];
+   for (int i = 0; i < recordDeserializers.length; i++) {
+   recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
+   ioManager.getSpillingDirectoriesPaths());
+   }
+   }
+
+   @Override
+   public StreamElement pollNext() throws Exception {
+
+   while (true) {
+   // get the stream element from the deserializer
+   if (currentRecordDeserializer != null) {
+   DeserializationResult result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate);
+   if (result.isBufferConsumed()) {
+   
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
+   currentRecordDeserializer = null;
+   }
+
+   if (result.isFullRecord()) {
+   return 

[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces

2019-05-22 Thread GitBox
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] 
Introduce Input and NetworkInput interfaces
URL: https://github.com/apache/flink/pull/8476#discussion_r286740477
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Input.java
 ##
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
+import java.io.Closeable;
+
+/**
+ * Basic interface for inputs of stream operators.
+ */
+@Internal
+public interface Input extends AsyncDataInput, Closeable {
 
 Review comment:
   > Generally speaking Closeable is better and preferred - it extends 
Autocloseable and provide stronger/easier to use contract of close() being 
idempotent.
   
   You are right.
   
   > Is there a reason why Input#close() can not be idempotent?
   
   No. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] Introduce Input and NetworkInput interfaces

2019-05-22 Thread GitBox
sunhaibotb commented on a change in pull request #8476: [FLINK-12490][network] 
Introduce Input and NetworkInput interfaces
URL: https://github.com/apache/flink/pull/8476#discussion_r286740477
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Input.java
 ##
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
+import java.io.Closeable;
+
+/**
+ * Basic interface for inputs of stream operators.
+ */
+@Internal
+public interface Input extends AsyncDataInput, Closeable {
 
 Review comment:
   No. You are right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define the data types in Python

2019-05-22 Thread GitBox
flinkbot edited a comment on issue #8420: [FLINK-12408][python] Allow to define 
the data types in Python
URL: https://github.com/apache/flink/pull/8420#issuecomment-491604302
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @twalthr [PMC]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define the data types in Python

2019-05-22 Thread GitBox
sunjincheng121 commented on issue #8420: [FLINK-12408][python] Allow to define 
the data types in Python
URL: https://github.com/apache/flink/pull/8420#issuecomment-495021535
 
 
   @flinkbot approve-until architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking

2019-05-22 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) reassigned FLINK-12536:
-

Assignee: Congxian Qiu(klion26)

> Make BufferOrEventSequence#getNext() non-blocking
> -
>
> Key: FLINK-12536
> URL: https://issues.apache.org/jira/browse/FLINK-12536
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>
> Currently it is non-blocking in case of credit-based flow control (default), 
> however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from 
> file. We might want to consider reimplementing it to be non blocking with 
> {{CompletableFuture isAvailable()}} method.
>  
> Otherwise we will block mailbox processing for the duration of reading from 
> file - for example we will block processing time timers and potentially in 
> the future network flushes.
>  
> This is not a high priority change, since it affects non-default 
> configuration option AND at the moment only processing time timers are 
> planned to be moved to the mailbox for 1.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread GitBox
bowenli86 commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs 
in catalogs should check existing object and new object are of the same class
URL: https://github.com/apache/flink/pull/8514#issuecomment-494985129
 
 
   cc @xuefuz @lirui-apache @zjuwangg


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)

2019-05-22 Thread GitBox
flinkbot commented on issue #8515: [FLINK-12539] [fs-connector] Make 
StreamingFileSink customizable (#8469)
URL: https://github.com/apache/flink/pull/8515#issuecomment-494984545
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-7167) Job can't set checkpoint meta directory to cover the system setting state.checkpoints.dir

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7167:
---

Assignee: (was: Bowen Li)

> Job can't set checkpoint meta directory to cover the system setting 
> state.checkpoints.dir 
> --
>
> Key: FLINK-7167
> URL: https://issues.apache.org/jira/browse/FLINK-7167
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.3.1
>Reporter: yuqi
>Priority: Major
>
> If we want to recover a failed job use checkpoint, till now, as all job 
> checkpoint meta data are in the same directory and do not have specific 
> identification, we have to traverse all file in the directory to find the 
> data of this job.  this is rather troublesome.  so seting this configuration 
> in the job level is preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5809) Add registerNextWatermarkCallback() to TimerService

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-5809:
---

Assignee: (was: Bowen Li)

> Add registerNextWatermarkCallback() to TimerService
> ---
>
> Key: FLINK-5809
> URL: https://issues.apache.org/jira/browse/FLINK-5809
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Aljoscha Krettek
>Priority: Major
>
> This new method is similar to {{registerEventTimeTimer()}} but instead of 
> specifying a certain time where the timer should fire this will simply fire 
> whenever a new watermark arrives.
> This is, for example, useful when you want to buffer elements for sorting by 
> event time and always want to emit as soon as the operator receives a 
> watermark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7954) sideoutput in async function

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7954:
---

Assignee: (was: Bowen Li)

> sideoutput in async function
> 
>
> Key: FLINK-7954
> URL: https://issues.apache.org/jira/browse/FLINK-7954
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.2
> Environment: similar to FLINK-7635,adding support of sideoutput to 
> asynFunction 
>Reporter: Chen Qin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8969) Move TimerService into state backend

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-8969:
---

Assignee: (was: Bowen Li)

> Move TimerService into state backend
> 
>
> Key: FLINK-8969
> URL: https://issues.apache.org/jira/browse/FLINK-8969
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Bowen Li
>Priority: Major
>
> upon discussion with [~aljoscha]. More details need to be added here



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12243) Support generic table stats related operations in HiveCatalog

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12243:
-
Summary: Support generic table stats related operations in HiveCatalog  
(was: Support table stats related operations in GenericHiveMetastoreCatalog)

> Support generic table stats related operations in HiveCatalog
> -
>
> Key: FLINK-12243
> URL: https://issues.apache.org/jira/browse/FLINK-12243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Support  table stats related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tweise opened a new pull request #8515: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)

2019-05-22 Thread GitBox
tweise opened a new pull request #8515: [FLINK-12539] [fs-connector] Make 
StreamingFileSink customizable (#8469)
URL: https://github.com/apache/flink/pull/8515
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12243) Support generic table stats related operations in HiveCatalog

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-12243:


Assignee: (was: Bowen Li)

> Support generic table stats related operations in HiveCatalog
> -
>
> Key: FLINK-12243
> URL: https://issues.apache.org/jira/browse/FLINK-12243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Support  table stats related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12237) Support Hive table stats related operations in HiveCatalog

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12237:
-
Summary: Support Hive table stats related operations in HiveCatalog  (was: 
Support table stats related operations in HiveCatalog)

> Support Hive table stats related operations in HiveCatalog
> --
>
> Key: FLINK-12237
> URL: https://issues.apache.org/jira/browse/FLINK-12237
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Support table stats related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12237) Support table stats related operations in HiveCatalog

2019-05-22 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-12237:


Assignee: (was: Bowen Li)

> Support table stats related operations in HiveCatalog
> -
>
> Key: FLINK-12237
> URL: https://issues.apache.org/jira/browse/FLINK-12237
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> Support table stats related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread GitBox
flinkbot commented on issue #8514: [FLINK-12582][table][hive] Alteration APIs 
in catalogs should check existing object and new object are of the same class
URL: https://github.com/apache/flink/pull/8514#issuecomment-494983478
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8514: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread GitBox
bowenli86 opened a new pull request #8514: [FLINK-12582][table][hive] 
Alteration APIs in catalogs should check existing object and new object are of 
the same class
URL: https://github.com/apache/flink/pull/8514
 
 
   ## What is the purpose of the change
   
   This PR supports alterations in catalogs to check existing object and new 
object are of the same class. 
   
   Most of them currently don't, e.g. you can alter an existing generic table 
with a new hive table in GenericInMemoryCatalog.
   
   ## Brief change log
   
 - changed the following alteration API to check existing object and new 
object are of the same class
   - HiveCatalog: alterDatabase(), alterTable(), alterFunction()
   - GenericInMemoryCatalog: alterDatabase(), alterTable(), 
alterFunction(),  alterPartition()
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   added unit tests `testAlterDb_differentTypedDb()`, 
`testAlterTable_differentTypedTable()`, 
`testAlterFunction_differentTypedFunction()`  in `CatalogTestBase`, and 
`testAlterPartition_differentTypedPartition()` in `GenericInMemoryCatalogTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xccui commented on issue #8372: [FLINK-12354] [table] Add REVERSE function for table/sql API

2019-05-22 Thread GitBox
xccui commented on issue #8372: [FLINK-12354] [table] Add REVERSE function for 
table/sql API
URL: https://github.com/apache/flink/pull/8372#issuecomment-494961184
 
 
   Hi @zzchun, sorry for the late reply. I have a quick look at the PR. 
Overall, it looks good and I think the `REVERSE` function is eligible enough to 
be added. However, since the flink-table layer has been applying some major 
changes recently, maybe it's better to add the function after that.
   
   Best, Xingcan


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8507: [FLINK-12236][hive] Support Hive function in HiveCatalog

2019-05-22 Thread GitBox
bowenli86 commented on issue #8507: [FLINK-12236][hive] Support Hive function 
in HiveCatalog
URL: https://github.com/apache/flink/pull/8507#issuecomment-494911693
 
 
   cc @xuefuz @lirui-apache @zjuwangg


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog

2019-05-22 Thread GitBox
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] 
Support Hive partition in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r286602063
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -740,10 +745,13 @@ public void alterPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionS
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
checkNotNull(newPartition, "New partition cannot be null");
 
+   checkArgument(newPartition instanceof HiveCatalogPartition, 
"Currently only supports HiveCatalogPartition");
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog

2019-05-22 Thread GitBox
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] 
Support Hive partition in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r286601953
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -639,8 +640,12 @@ public void createPartition(ObjectPath tablePath, 
CatalogPartitionSpec partition
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
checkNotNull(partition, "Partition cannot be null");
 
+   checkArgument(partition instanceof HiveCatalogPartition, 
"Currently only supports HiveCatalogPartition");
 
 Review comment:
   We currently throw `CatalogException` if the type doesn't match. 
`checkArgument()` will throw `IllegalArgumentException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog

2019-05-22 Thread GitBox
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] 
Support Hive partition in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r286608694
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -616,44 +619,279 @@ private  static Table instantiateHiveTable(ObjectPath 
tablePath, CatalogBaseTabl
// -- partitions --
 
@Override
-   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
-   throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
-   throw new UnsupportedOperationException();
+   public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws CatalogException {
+   checkNotNull(tablePath, "Table path cannot be null");
+   checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+   try {
+   return getHivePartition(tablePath, partitionSpec) != 
null;
+   } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   return false;
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+   }
}
 
@Override
-   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
-   throws PartitionNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+   throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
+   checkNotNull(tablePath, "Table path cannot be null");
+   checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+   checkNotNull(partition, "Partition cannot be null");
+
+   checkArgument(partition instanceof HiveCatalogPartition, 
"Currently only supports HiveCatalogPartition");
+
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensureTableAndPartitionMatch(hiveTable, partition);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   
client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, 
partition));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath));
+   }
}
 
@Override
-   public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   checkNotNull(tablePath, "Table path cannot be null");
+   checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   client.dropPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getOrderedFullPartitionValues(partitionSpec, 
getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
+   } catch (NoSuchObjectException e) {
+   if (!ignoreIfNotExists) {
+   throw new 
PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+   }
+   } catch (MetaException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec, e);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to drop partition %s of 
table %s", partitionSpec, tablePath));
+ 

[GitHub] [flink] bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] Support Hive partition in HiveCatalog

2019-05-22 Thread GitBox
bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive] 
Support Hive partition in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r286602271
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -770,6 +778,16 @@ public void alterPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionS
}
}
 
+   // make sure both table and partition are generic, or neither is
+   private void ensureTableAndPartitionMatch(Table hiveTable, 
CatalogPartition catalogPartition) {
+   boolean isGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+   if ((isGeneric && catalogPartition instanceof 
HiveCatalogPartition) ||
+   (!isGeneric && catalogPartition instanceof 
GenericCatalogPartition)) {
+   throw new 
IllegalArgumentException(String.format("Cannot handle %s partition for %s 
table",
 
 Review comment:
   throw `CatalogException`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] theoDiefenthal commented on issue #8513: [FLINK-12147] [metrics] Error for monitor kafka metrics when Use influxDB metr…

2019-05-22 Thread GitBox
theoDiefenthal commented on issue #8513:  [FLINK-12147] [metrics] Error for 
monitor kafka metrics when Use influxDB metr…
URL: https://github.com/apache/flink/pull/8513#issuecomment-494907147
 
 
   IMHO the correct place for this is here as InfluxDB is most widely used for 
monitoring, but not exclusively. I personally worked with InfluxDB for another 
purpose where I wanted those errors to arise. But one could of course avoid the 
roundtrip by letting the InfluxDB client directly throw such an exception... 
   
   Further, this still is a workaround as InfluxDB dev team already tracks an 
issue to to support infinity values in future. [ 
https://github.com/influxdata/influxdb/issues/4089 , 
https://github.com/influxdata/flux/issues/1040 ]. The InfluxDB client should 
thus not be limited in terms of not sending the numbers. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise merged pull request #8469: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable

2019-05-22 Thread GitBox
tweise merged pull request #8469: [FLINK-12539] [fs-connector] Make 
StreamingFileSink customizable
URL: https://github.com/apache/flink/pull/8469
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add 
the interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r286601919
 
 

 ##
 File path: flink-ml/pom.xml
 ##
 @@ -0,0 +1,39 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.9-SNAPSHOT
+   ..
+   
+
+   flink-ml
 
 Review comment:
   +1 to remove if no response in the user-ml. thanks for taking care of this 
@shaoxuan-wang . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
walterddr commented on a change in pull request #8402: [FLINK-12473][ml] Add 
the interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r286601213
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.api.misc.persist.Persistable;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and 
does not have any
+ * actual functionality. Its subclasses must be either Estimator or 
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * Each pipeline stage is with parameters and meanwhile persistable.
+ *
+ * @param  The class type of the PipelineStage implementation itself, used 
by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage> extends WithParams, 
Serializable,
+   Persistable {
+
+   default String toJson() {
 
 Review comment:
   +1 to expose more extendable interface in the next PR. 
   
   I am still not fully convince that "toJson" addresses all of the concerns 
that we raise, especially once @blublinsky raised. but I also have a strong 
feeling that this can be address in future PR. 
   One fundamental supporting factor I am on the "toJson" interface is that: a 
readable/editable format where a model can be bi-directionally converted in and 
out of FlinkML is 100% a need in multiple scenarios. for this PR purpose I 
don't think we need to focus on addressing "all" scenarios. what do you guys 
think?
   
   IMO the follow up can be on "export", "serving", or even "standardizing" the 
model


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol merged pull request #8414: [FLINK-12447][Build] Enforce maven version 3.1.1

2019-05-22 Thread GitBox
zentol merged pull request #8414: [FLINK-12447][Build] Enforce maven version 
3.1.1
URL: https://github.com/apache/flink/pull/8414
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol merged pull request #8512: [FLINK-12447] [cron-master-maven_compat] Bump required Maven version to 3.1.1 (from 3.0.3)

2019-05-22 Thread GitBox
zentol merged pull request #8512: [FLINK-12447] [cron-master-maven_compat] Bump 
required Maven version to 3.1.1 (from 3.0.3)
URL: https://github.com/apache/flink/pull/8512
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8501: [FLINK-12254][table] Update TableSchema to new type system

2019-05-22 Thread GitBox
bowenli86 commented on issue #8501: [FLINK-12254][table] Update TableSchema to 
new type system
URL: https://github.com/apache/flink/pull/8501#issuecomment-494892462
 
 
   Hive part LGTM. I'll have a followup PR to switch Hive type conversions to 
new Flink type system


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-12539) StreamingFileSink: Make the class extendable to customize for different usecases

2019-05-22 Thread Thomas Weise (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise resolved FLINK-12539.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

> StreamingFileSink: Make the class extendable to customize for different 
> usecases
> 
>
> Key: FLINK-12539
> URL: https://issues.apache.org/jira/browse/FLINK-12539
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Kailash Hassan Dayanand
>Assignee: Kailash Hassan Dayanand
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the StreamingFileSink has Builder pattern and the actual 
> constructor of StreamingFileSink is private. This makes it hard to extend the 
> class to built on top of this and customize the sink. (Example: Adding new 
> metrics). Proposing to make this protected as well as protected for the 
> Builder interface.
>  
> Discussion is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAC27z=phl8+gw-ugmjkxbriseky9zimi2crpqvlpcnyupt8...@mail.gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)

2019-05-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12447:
-
Fix Version/s: 1.9.0

> Bump required Maven version to 3.1.1 (from 3.0.3)
> -
>
> Key: FLINK-12447
> URL: https://issues.apache.org/jira/browse/FLINK-12447
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> See 
> https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E
>  
> The frontend-maven-plugin requires at least Maven 3.1.0.
> I propose to bump the required Maven version to 3.1.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)

2019-05-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12447:
-
Issue Type: Improvement  (was: Task)

> Bump required Maven version to 3.1.1 (from 3.0.3)
> -
>
> Key: FLINK-12447
> URL: https://issues.apache.org/jira/browse/FLINK-12447
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.9.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> See 
> https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E
>  
> The frontend-maven-plugin requires at least Maven 3.1.0.
> I propose to bump the required Maven version to 3.1.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)

2019-05-22 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846077#comment-16846077
 ] 

Chesnay Schepler commented on FLINK-12447:
--

maven compatibility branch updated in 1a5fdbae00aafa8b7fbcab6da687f173db60797a

master: f652bb5a6ff3855ccf0038a5c0469896f41df908

> Bump required Maven version to 3.1.1 (from 3.0.3)
> -
>
> Key: FLINK-12447
> URL: https://issues.apache.org/jira/browse/FLINK-12447
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.9.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> See 
> https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E
>  
> The frontend-maven-plugin requires at least Maven 3.1.0.
> I propose to bump the required Maven version to 3.1.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12447) Bump required Maven version to 3.1.1 (from 3.0.3)

2019-05-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12447:
-
Affects Version/s: 1.9.0

> Bump required Maven version to 3.1.1 (from 3.0.3)
> -
>
> Key: FLINK-12447
> URL: https://issues.apache.org/jira/browse/FLINK-12447
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.9.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> See 
> https://lists.apache.org/thread.html/57dec7c338eb95247b7a05ded371f4a78420a964045ea9557d501c3f@%3Cdev.flink.apache.org%3E
>  
> The frontend-maven-plugin requires at least Maven 3.1.0.
> I propose to bump the required Maven version to 3.1.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-05-22 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi reassigned FLINK-12576:
--

Assignee: aitozi

> inputQueueLength metric does not work for LocalInputChannels
> 
>
> Key: FLINK-12576
> URL: https://issues.apache.org/jira/browse/FLINK-12576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Piotr Nowojski
>Assignee: aitozi
>Priority: Major
>
> Currently {{inputQueueLength}} ignores LocalInputChannels 
> ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
> when looking for causes of back pressure (If task is back pressuring whole 
> Flink job, but there is a data skew and only local input channels are being 
> used).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] c4emmmm opened a new pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
c4e opened a new pull request #8402: [FLINK-12473][ml] Add the interface of 
ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402
 
 
   
   
   ## What is the purpose of the change
   
   This pull request introduces the major interfaces for ML pipeline and ML lib.
   
   ## Brief change log
   
 - Create flink-ml module under flink root
 - Add flink-ml-api module, and introduce the major API interfaces, 
including PipelineStage, Estimator, Transformer, Model, Pipeline, etc.
   
   ## Verifying this change
   
   This change is pure interface design without any test coverage.
   
   This PR only adds ML pipeline & ML lib interface. The implementations on 
these interface will be added in the next PR (please refer to “implementation 
plan” section of [FLIP-39 
doc](https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo)
 where we will add the test cases. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? 
   - JavaDocs

   - Flink document (will submit via a separate PR)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shaoxuan-wang commented on issue #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
shaoxuan-wang commented on issue #8402: [FLINK-12473][ml] Add the interface of 
ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#issuecomment-494873716
 
 
   Thanks for the contribution, @c4e. The entire design looks good to me. I 
have gone over all the comments. It seems most of them have been addressed. 
There is one major comment about the way to persist and reload the mode. I 
noticed that you have responded to the reviewers, while they have not yet 
confirmed if they are happy with your solution or not. 
   In general I am +1 on this PR. Will merge it in the next 1-2 days if there 
is no further comments/concerns coming out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shaoxuan-wang closed pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
shaoxuan-wang closed pull request #8402: [FLINK-12473][ml] Add the interface of 
ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] 
Add the interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r286569876
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.api.misc.persist.Persistable;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and 
does not have any
+ * actual functionality. Its subclasses must be either Estimator or 
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * Each pipeline stage is with parameters and meanwhile persistable.
+ *
+ * @param  The class type of the PipelineStage implementation itself, used 
by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage> extends WithParams, 
Serializable,
+   Persistable {
+
+   default String toJson() {
 
 Review comment:
   Thanks for all the comments. It triggers a broad and deep thinking on this 
interface and the future plan to export/load the model in different formats. I 
have offline discussed this with @c4e. The final plan looks good to me. 
@blublinsky, @walterddr, @ex00 , please let us know if you have any further 
question on the solution proposed by @c4e. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286541806
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception 
{
 
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) 
{
-   IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-   final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
-
-   if (singleInputGate != null) {
-   // Run asynchronously because it might 
be blocking
-   getRpcService().execute(
-   () -> {
-   try {
-   
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
-   } catch (IOException | 
InterruptedException e) {
-   
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
-
-   try {
-   
task.failExternally(e);
-   } catch 
(RuntimeException re) {
-   // 
TODO: Check whether we need this or make exception in failExtenally checked
-   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
-   }
-   }
-   });
-   } else {
-   return 
FutureUtils.completedExceptionally(
-   new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
-   " for task " + 
executionAttemptID + " was found."));
-   }
+   // Run asynchronously because it might be 
blocking
+   getRpcService().execute(
+   () -> {
+   try {
+   
networkEnvironment.updatePartitionInfo(partitionInfo);
+   } catch (Throwable t) {
 
 Review comment:
   I think we should not catch `Throwable` here. If an unchecked exception 
occurs, it should simply bubble up and cause the component to fail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286540299
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -441,6 +444,7 @@ public void close() throws IOException {
finally {
isReleased = true;
 
 Review comment:
   I think we could remove this field because we have now the `closeFuture`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286563093
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1221,53 +1214,46 @@ public void run() {
 */
@VisibleForTesting
void onPartitionStateUpdate(
-   IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId,
-   ExecutionState producerState) throws IOException, 
InterruptedException {
+   ExecutionState producerState,
+   CompletableFuture producerReadyFuture) {
 
 Review comment:
   Instead of passing in this future, I would return a boolean indicating 
whether the producer is ready or not


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286560412
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck(
} else {
failExternally(throwable);
}
-   } catch (IOException | InterruptedException e) {
-   failExternally(e);
+   } catch (Throwable t) {
+   
failExternally(stripCompletionException(t));
}
},
executor);
+
+   return producerReadyFuture;
 
 Review comment:
   What about the following alternative:
   ```
   final CompletableFuture producerReadyFuture = new 
CompletableFuture<>();
   
   FutureUtils.assertNoException(
futurePartitionState.whenCompleteAsync(
(ExecutionState executionState, Throwable throwable) -> {
if (executionState != null || throwable instanceof 
TimeoutException) {
final boolean producerReady = 
onPartitionStateUpdate(
resultPartitionId,
executionState != null ? executionState 
: ExecutionState.RUNNING);
   
producerReadyFuture.complete(producerReady);
} else {
if (throwable instanceof 
PartitionProducerDisposedException) {
String msg = String.format("Producer %s 
of partition %s disposed. Cancelling execution.",

resultPartitionId.getProducerId(), resultPartitionId.getPartitionId());
LOG.info(msg, throwable);
cancelExecution();
} else {
failExternally(throwable);
}
   
producerReadyFuture.complete(false);
}
},
executor));
   
   return producerReadyFuture;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286551026
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck(
} else {
failExternally(throwable);
}
-   } catch (IOException | InterruptedException e) {
-   failExternally(e);
+   } catch (Throwable t) {
 
 Review comment:
   We should not catch `Throwable` because it could be a legitimate Flink 
problem which should make us to fail the process.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286565922
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1221,53 +1214,46 @@ public void run() {
 */
@VisibleForTesting
void onPartitionStateUpdate(
 
 Review comment:
   Maybe rename into `isProducerReadyAndHandlePartitionStateUpdate`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286546519
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception 
{
 
if (task != null) {
for (final PartitionInfo partitionInfo: partitionInfos) 
{
-   IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-   final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
-
-   if (singleInputGate != null) {
-   // Run asynchronously because it might 
be blocking
-   getRpcService().execute(
-   () -> {
-   try {
-   
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
-   } catch (IOException | 
InterruptedException e) {
-   
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
-
-   try {
-   
task.failExternally(e);
-   } catch 
(RuntimeException re) {
-   // 
TODO: Check whether we need this or make exception in failExtenally checked
-   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
-   }
-   }
-   });
-   } else {
-   return 
FutureUtils.completedExceptionally(
-   new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
-   " for task " + 
executionAttemptID + " was found."));
-   }
+   // Run asynchronously because it might be 
blocking
+   getRpcService().execute(
+   () -> {
+   try {
+   
networkEnvironment.updatePartitionInfo(partitionInfo);
+   } catch (Throwable t) {
+   log.error(
+   "Could not 
update input data location for task {}. Trying to fail task.",
+   
task.getTaskInfo().getTaskName(),
+   t);
+   
FutureUtils.assertNoException(CompletableFuture.runAsync(
+   () -> 
task.failExternally(t),
+   
getRpcService().getExecutor()));
+   }
+   });
 
 Review comment:
   I would suggest to exchange this block with:
   ```
   FutureUtils.assertNoException(
CompletableFuture.runAsync(
() -> {
try {

networkEnvironment.updatePartitionInfo(partitionInfo);
} catch (IOException | InterruptedException e) {
log.error("Could not update input data location 
for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
task.failExternally(e);
}
},
getRpcService().getExecutor()));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-22 Thread GitBox
tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286568353
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -97,13 +104,15 @@ private NetworkEnvironment(
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
+   Map 
inputGatesById,
 
 Review comment:
   Passing the map for storing the input gates into `NetworkEnvironment` 
exposes in my opinion a bit too many implementation details of this structure. 
I basically says that the object needs to store the `SingleInputGates` in this 
structure. Otherwise the test will fail. I think it would be better to have a 
`getInputGate(IntermediateDataSetID)` and let the way the gates are stored 
internally be an implementation detail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-22 Thread GitBox
shaoxuan-wang commented on a change in pull request #8402: [FLINK-12473][ml] 
Add the interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r286564448
 
 

 ##
 File path: flink-ml/pom.xml
 ##
 @@ -0,0 +1,39 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.9-SNAPSHOT
+   ..
+   
+
+   flink-ml
 
 Review comment:
   I started a 
[discussion](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html)
 in dev/user-ml to collect the feedback. @zentol pointed out that we can just 
remove the legacy flink-ml package, we do not expect any further active 
development on this package.  I think he is right. I will create another PR and 
delete it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >