[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297494125
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -154,24 +157,40 @@ private static void registerOutputMetrics(
 
public static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
SingleInputGate[] inputGates) {
registerInputMetrics(
isDetailedMetrics,
+   isCreditBased,
inputGroup,
inputGroup.addGroup(METRIC_GROUP_BUFFERS),
inputGates);
}
 
private static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
MetricGroup buffersGroup,
SingleInputGate[] inputGates) {
if (isDetailedMetrics) {
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
+
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
+
+   if (isCreditBased) {
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+
+   
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, 
exclusiveBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, 
floatingBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates));
+   } else {
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
+   }
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
 
 Review comment:
   should remove this, it was already done in above else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297493392
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -66,6 +66,8 @@
 
private static final String METRIC_INPUT_QUEUE_LENGTH = 
"inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+   private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = 
"floatingBuffersUsage";
 
 Review comment:
   floatingBuffersUsage -> inputFloatingBuffersUsage, also add the `input` 
prefix for following `exclusiveBuffersUsage`, then it is easy to distinguish 
from the metric name and keep consistent with other.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491959
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
 
 Review comment:
   remove `pool`, `measuring the input buffers usage for `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465153
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/runtime/RegionFailoverITCase.java
 ##
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for testing region failover strategy.
+ */
+public class RegionFailoverITCase extends TestLogger {
 
 Review comment:
   I take it as E2E tests that ensures the job can succeed even when task 
failures happens when using region failover strategy.
   IT cases do blackbox verifications so it does not verify internal 
states(like which tasks are re-scheduled).
   It just gives us confidence that Flink runtime is working well in this 
scenario.
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297491370
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Records modifications of
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex 
ExecutionVertices}, and allows
+ * for checking whether a vertex was modified.
+ *
+ * Examples for modifications include:
+ * 
+ * cancellation of the underlying execution
+ * deployment of the execution vertex
+ * 
+ *
+ * @see DefaultScheduler
+ */
+public class ExecutionVertexVersioner {
 
 Review comment:
   ExecutionVertexVersioner is from @GJL . He has also implemented tests for 
them 
https://github.com/GJL/flink/blob/FLINK-12433/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java.
   
   I've ported these tests and made some minor changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
Aitozi commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491110
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   I also notice this, but i have not change this part, does this comes from 
some commit which does not generate the html?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491040
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   agree above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491040
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297490492
 
 

 ##
 File path: 
docs/_includes/generated/netty_shuffle_environment_configuration.html
 ##
 @@ -0,0 +1,61 @@
+
 
 Review comment:
   ditto: this file should not be changed as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297490427
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   Why this changed? I think it should not be changed, and you could double 
check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872933#comment-16872933
 ] 

zhijiang edited comment on FLINK-12889 at 6/26/19 5:33 AM:
---

My previous analysis in ML was as follows:
  
 Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: 
 * The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in TM.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in TM.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in TM.

 
 At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
  
 For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
  
 I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
  
 [~till.rohrmann] do you think it is worth fixing or have other concerns?


was (Author: zjwang):
My previous analysis in ML was as follows:
  
 Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: 
 * The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
 At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
  
 For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
  
 I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
  
 [~till.rohrmann] do you think it is worth fixing or have other concerns?

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Minor
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final 

[jira] [Comment Edited] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872933#comment-16872933
 ] 

zhijiang edited comment on FLINK-12889 at 6/26/19 5:32 AM:
---

My previous analysis in ML was as follows:
  
 Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: 
 * The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
 At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
  
 For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
  
 I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
  
 [~till.rohrmann] do you think it is worth fixing or have other concerns?


was (Author: zjwang):
My previous analysis in ML was as follows:
 
Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: * The checkpoint in task causes 
OOM issue which would call `Task#failExternally` as a result, we could see the 
log "Attempting to fail task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
 
For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
 
I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
 
[~till.rohrmann] do you think it is worth fixing or have other concerns?

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Minor
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but 

[jira] [Updated] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12889:
-
Priority: Minor  (was: Blocker)

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Minor
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu commented on issue #8847: [FLINK-12609][python] Align the Python data types with Java

2019-06-25 Thread GitBox
dianfu commented on issue #8847: [FLINK-12609][python] Align the Python data 
types with Java
URL: https://github.com/apache/flink/pull/8847#issuecomment-505724940
 
 
   Hi @sunjincheng121, makes sense to me. I have created #8892 and #8893 for 
the hotfix changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12889:
-
Affects Version/s: (was: 1.7.2)

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Fan Xinpu
>Priority: Blocker
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872933#comment-16872933
 ] 

zhijiang commented on FLINK-12889:
--

My previous analysis in ML was as follows:
 
Actually all the five "Source: ServiceLog" tasks are not in terminal state on 
JM view, the relevant processes shown in below: * The checkpoint in task causes 
OOM issue which would call `Task#failExternally` as a result, we could see the 
log "Attempting to fail task externally" in tm.
 * The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
 * When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

 
At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 
 
For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.
 
I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.
 
[~till.rohrmann] do you think it is worth fixing or have other concerns?

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: Fan Xinpu
>Priority: Blocker
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12889) Job keeps in FAILING state

2019-06-25 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12889:
-
Component/s: (was: API / DataStream)
 Runtime / Coordination

> Job keeps in FAILING state
> --
>
> Key: FLINK-12889
> URL: https://issues.apache.org/jira/browse/FLINK-12889
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Fan Xinpu
>Priority: Blocker
> Attachments: 20190618104945417.jpg, jobmanager.log.2019-06-16.0, 
> taskmanager.log
>
>
> There is a topology of 3 operator, such as, source, parser, and persist. 
> Occasionally, 5 subtasks of the source encounters exception and turns to 
> failed, at the same time, one subtask of the parser runs into exception and 
> turns to failed too. The jobmaster gets a message of the parser's failed. The 
> jobmaster then try to cancel all the subtask, most of the subtasks of the 
> three operator turns to canceled except the 5 subtasks of the source, because 
> the state of the 5 ones is already FAILED before jobmaster try to cancel it. 
> Then the jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state. 
>  
> The job run on a flink 1.7 cluster on yarn, and there is only one tm with 10 
> slots.
>  
> The attached files contains a jm log , tm log and the ui picture.
>  
> The exception timestamp is about 2019-06-16 13:42:28.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on issue #8875: [FLINK-12970][hive] Support writing Hive complex types

2019-06-25 Thread GitBox
lirui-apache commented on issue #8875: [FLINK-12970][hive] Support writing Hive 
complex types
URL: https://github.com/apache/flink/pull/8875#issuecomment-505721128
 
 
   Thanks @bowenli86 for the review. Please take another look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8882: [FLINK-12965][table][hive] unify catalog view implementations

2019-06-25 Thread GitBox
xuefuz commented on a change in pull request #8882: [FLINK-12965][table][hive] 
unify catalog view implementations
URL: https://github.com/apache/flink/pull/8882#discussion_r297481016
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
 ##
 @@ -28,7 +28,7 @@
  * A view created from a {@link QueryOperation} via operations on {@link 
org.apache.flink.table.api.Table}.
  */
 @Internal
-public class QueryOperationCatalogView extends AbstractCatalogView {
+public class QueryOperationCatalogView extends CatalogViewImpl {
 
 Review comment:
   Previously QueryOperationCatalogView, HiveCatalogView, and 
GenericCatalogView were children of AbstractCatalogView. Now we combine the 
latter two, and make the first one as a child of the combo, which might be a 
concern. I'm not sure if this posts any issue, but maybe we can keep the 
relationship unchanged?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8893: [hotfix][python] Align the signature of type utility methods with Java

2019-06-25 Thread GitBox
flinkbot commented on issue #8893: [hotfix][python] Align the signature of type 
utility methods with Java
URL: https://github.com/apache/flink/pull/8893#issuecomment-505713502
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu opened a new pull request #8893: [hotfix][python] Align the signature of type utility methods with Java

2019-06-25 Thread GitBox
dianfu opened a new pull request #8893: [hotfix][python] Align the signature of 
type utility methods with Java
URL: https://github.com/apache/flink/pull/8893
 
 
   ## What is the purpose of the change
   
   *Currently, the python method `DataTypes.VARCHAR(length)` assumes default 
value `1` for argument `length` and we should align with Java and don't provide 
default value for it as currently the length is not respected in Planner and 
users will easily write programs which can run in the current version and won't 
run in the future versions after the argument `length` is respected.*
   
   ## Brief change log
   
 - *Remove the default value for the utilities methods of DataTypes*
   
   ## Verifying this change
   
   This change is already covered by existing tests*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8892: [FLINK-FLINK-12990][python] Date type doesn't consider the local TimeZone

2019-06-25 Thread GitBox
flinkbot commented on issue #8892: [FLINK-FLINK-12990][python] Date type 
doesn't consider the local TimeZone
URL: https://github.com/apache/flink/pull/8892#issuecomment-505711190
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12990) Date type doesn't consider the local TimeZone

2019-06-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12990:
---
Labels: pull-request-available  (was: )

> Date type doesn't consider the local TimeZone
> -
>
> Key: FLINK-12990
> URL: https://issues.apache.org/jira/browse/FLINK-12990
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the python DateType is converted by an `int` which indicates the 
> days passed since 1970-1-1 and then the Java side will create a Java Date by 
> call `new Date(days * 86400)`. As we know that the Date constructor expected 
> milliseconds since 1970-1-1 00:00:00 GMT and so we should convert `days * 
> 86400` to GMT milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu opened a new pull request #8892: [FLINK-FLINK-12990][python] Date type doesn't consider the local TimeZone

2019-06-25 Thread GitBox
dianfu opened a new pull request #8892: [FLINK-FLINK-12990][python] Date type 
doesn't consider the local TimeZone
URL: https://github.com/apache/flink/pull/8892
 
 
   ## What is the purpose of the change
   
   *Currently, the python DateType is converted by an `int` which indicates the 
days passed since 1970-1-1 and then the Java side will create a Java Date by 
call `new Date(days * 86400)`. As we know that the Date constructor expected 
milliseconds since 1970-1-1 00:00:00 GMT and so we should convert `days * 
86400` to GMT milliseconds. This PR will fix this issue.*
   
   ## Brief change log
   
 - *Add method getOffsetFromLocalMillis to deal with the offset calculation 
of the local timezone and use the calculated offset when creating the Date 
object.*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8891: [hotfix][util] Use OneTimeLeaderListenerFuture in LeaderRetrievalUtils

2019-06-25 Thread GitBox
flinkbot commented on issue #8891:  [hotfix][util] Use 
OneTimeLeaderListenerFuture in LeaderRetrievalUtils
URL: https://github.com/apache/flink/pull/8891#issuecomment-505709075
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun opened a new pull request #8891: [hotfix][util] Use OneTimeLeaderListenerFuture in LeaderRetrievalUtils

2019-06-25 Thread GitBox
TisonKun opened a new pull request #8891:  [hotfix][util] Use 
OneTimeLeaderListenerFuture in LeaderRetrievalUtils
URL: https://github.com/apache/flink/pull/8891
 
 
   ## What is the purpose of the change
   
   Refactor LeaderRetrievalUtils by removing redundant codes.
   
   ## Brief change log
   
   1. Use OneTimeLeaderListenerFuture in LeaderRetrievalUtils, remove redundant 
LeaderConnectionInfoListener(which implemented relies on scala Future).
   2. Remove redundant class LeaderAddressAndId, which is all covered by 
LeaderConnectionInfo.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297475345
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297476171
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297475345
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make 
task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+   /** The execution graph on which this FailoverStrategy works. */
+   private final ExecutionGraph executionGraph;
+
+   /** The underlying new generation region failover strategy. */
+   private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+   /** The versioner helps to maintain execution vertex versions. */
+   private final ExecutionVertexVersioner executionVertexVersioner;
+
+   public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph 
executionGraph) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executionVertexVersioner = new ExecutionVertexVersioner();
+   }
+
+   @Override
+   public void onTaskFailure(final Execution taskExecution, final 
Throwable cause) {
+   // skip the failover if global restart strategy suppresses 
restarts
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   LOG.info("Fail to pass the restart strategy validation 
in region failover. Fallback to fail global.");
+   executionGraph.failGlobal(cause);
+   return;
+   }
+
+   // skip local failover if is in global failover
+   if 
(!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+   LOG.info("Skip current region failover as a global 
failover is ongoing.");
+   return;
+

[jira] [Comment Edited] (FLINK-12976) Bump Kafka client version to 2.3.0 for universal Kafka connector

2019-06-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872883#comment-16872883
 ] 

vinoyang edited comment on FLINK-12976 at 6/26/19 3:53 AM:
---

I can make sure that there are compatibility issues when bumping Kafka client 
version from 2.2.0 to 2.3.0

log details: [https://api.travis-ci.org/v3/job/550192082/log.txt]

One of them is the {{FlinkKafkaInternalProducer}} uses reflection to get the 
direct inner {{nextSequence}} field of {{TransactionManager}}.

Before 2.3.0, the type of this field is:
{code:java}
private final Map nextSequence;
{code}
How in 2.3.0, the {{TransactionManager}} changed its implementation(see 
[here|[https://github.com/apache/kafka/commit/1db46673661cf6dcb7f2fa2565262b04cf580367]]
 and KAFKA-7736), it introduced two inner classes, named 
{{TopicPartitionBookkeeper}} and {{TopicPartitionEntry}}. And the 
{{nextSequence}} is put into {{TopicPartitionEntry}}. 
{code:java}
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:262)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:253)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:156)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:752)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:393)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:351)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:870)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:856)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:365)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldException: nextSequence
at java.lang.Class.getDeclaredField(Class.java:2070)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:258)
... 16 more
{code}
So we need to change the way of getting this field. I will try to figure it 
out. cc [~aljoscha] and [~pnowojski].


was (Author: yanghua):
I can make sure that there are compatibility issues when bumping Kafka client 
version from 2.2.0 to 2.3.0

log details: [https://api.travis-ci.org/v3/job/550192082/log.txt]

One of them is the {{FlinkKafkaInternalProducer}} uses reflection to get the 
direct inner {{nextSequence}} field of {{TransactionManager}}.

Before 2.3.0, the type of this field is:
{code:java}
private final Map nextSequence;
{code}
How in 2.3.0, the {{TransactionManager}} changed its implementation, it 
introduced two inner classes, named {{TopicPartitionBookkeeper}} and 
{{TopicPartitionEntry}}. And the {{nextSequence}} is put into 
{{TopicPartitionEntry}}. 
{code:java}
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:262)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:253)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:156)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:752)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:98)
at 

[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297465087
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
 ##
 @@ -73,12 +73,35 @@ public Resource merge(Resource other) {
 
case AGGREGATE_TYPE_SUM:
default:
-   aggregatedValue = this.value + other.value;
+   aggregatedValue = value + other.value;
}
 
return create(aggregatedValue, resourceAggregateType);
}
 
+   public Resource subtract(Resource other) {
+   Preconditions.checkArgument(getClass() == other.getClass(), 
"Subtract with different resource resourceAggregateType");
 
 Review comment:
   Confusing error message. Maybe "different resource resourceAggregateType" -> 
"different resource class"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297465713
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
 ##
 @@ -73,12 +73,35 @@ public Resource merge(Resource other) {
 
case AGGREGATE_TYPE_SUM:
default:
-   aggregatedValue = this.value + other.value;
+   aggregatedValue = value + other.value;
}
 
return create(aggregatedValue, resourceAggregateType);
}
 
+   public Resource subtract(Resource other) {
+   Preconditions.checkArgument(getClass() == other.getClass(), 
"Subtract with different resource resourceAggregateType");
+   Preconditions.checkArgument(this.name.equals(other.name), 
"Subtract with different resource name");
+   Preconditions.checkArgument(this.resourceAggregateType == 
other.resourceAggregateType, "Subtract with different aggregate 
resourceAggregateType");
+   Preconditions.checkArgument(this.value >= other.value, "Try to 
subtract a larger resource from this one. ");
+
+   final double subtractedValue;
+   switch (resourceAggregateType) {
+   case AGGREGATE_TYPE_MAX:
+   // From the perspective of resource matching, 
if aggregation type is max,
+   // subtracting a value less than the current 
value should not decrease the
+   // ability of the original resource.
+   subtractedValue = value;
+   break;
 
 Review comment:
   I think we need a TODO or warning here for cases that the subtrahend equals 
to the minuend.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12990) Date type doesn't consider the local TimeZone

2019-06-25 Thread Dian Fu (JIRA)
Dian Fu created FLINK-12990:
---

 Summary: Date type doesn't consider the local TimeZone
 Key: FLINK-12990
 URL: https://issues.apache.org/jira/browse/FLINK-12990
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


Currently, the python DateType is converted by an `int` which indicates the 
days passed since 1970-1-1 and then the Java side will create a Java Date by 
call `new Date(days * 86400)`. As we know that the Date constructor expected 
milliseconds since 1970-1-1 00:00:00 GMT and so we should convert `days * 
86400` to GMT milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12957) Fix thrift and protobuf dependency examples in documentation

2019-06-25 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-12957.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged for 1.9.0: 46f94c11945676814c3ecc66e32b4eb895c0404e

> Fix thrift and protobuf dependency examples in documentation
> 
>
> Key: FLINK-12957
> URL: https://issues.apache.org/jira/browse/FLINK-12957
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2, 1.8.0, 1.9.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The examples in the docs are not up-to-date anymore and should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297472482
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -347,20 +365,65 @@ private MultiTaskSlot(
CompletableFuture 
slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId) 
{
super(slotRequestId, groupId);
+   Preconditions.checkNotNull(slotContextFuture);
 
this.parent = parent;
-   this.slotContextFuture = 
Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
 
this.children = new HashMap<>(16);
this.releasingChildren = false;
 
-   slotContextFuture.whenComplete(
-   (SlotContext ignored, Throwable throwable) -> {
-   if (throwable != null) {
-   release(throwable);
+   this.requestedResources = ResourceProfile.EMPTY;
+
+   this.slotContextFuture = 
slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+   if (throwable != null) {
+   // If the underlying resource request 
fail, currently we fails all the requests to
+   // simplify the logic.
+   release(throwable);
+   throw new 
CompletionException(throwable);
+   }
+
+   if (parent == null) {
+   ResourceProfile allocated = 
ResourceProfile.EMPTY;
+   List childrenToEvict = new 
ArrayList<>();
+
+   for (TaskSlot slot : children.values()) 
{
+   ResourceProfile 
allocatedIfInclude = allocated.merge(slot.getRequestedResources());
+
+   if 
(slotContext.getResourceProfile().isMatching(allocatedIfInclude)) {
+   allocated = 
allocatedIfInclude;
+   } else {
+   
childrenToEvict.add(slot);
+   }
+   }
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Not all requests are 
fulfilled due to over-allocated, number of requests is {}, " +
+   
"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, 
" +
+   
"evicted requests is {},",
+   children.size(),
+   
childrenToEvict.size(),
+   
slotContext.getResourceProfile(),
+   allocated,
+   
childrenToEvict);
}
-   });
+
+   if (childrenToEvict.size() == 
children.size()) {
+   // This only happens when we 
request to RM using the resource profile of a task
+   // who is belonging to a 
CoLocationGroup. Similar to dealing with the fail of
 
 Review comment:
   It's not clear to me why this only happens for CoLocationGroup.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297471061
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -316,6 +331,9 @@ public boolean contains(AbstractID groupId) {
// true if we are currently releasing our children
private boolean releasingChildren;
 
+   // the total requested by all the descendants.
+   private ResourceProfile requestedResources;
 
 Review comment:
   I think this is a little confusing. Do you mean the resource requested by 
tasks that are already assigned into the slot? May be use "occupied" or 
"allocated" to avoid ambiguity?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297470517
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -177,14 +181,18 @@ MultiTaskSlot createRootSlot(
}
 
@Nonnull
-   public Collection listResolvedRootSlotInfo(@Nullable 
AbstractID groupId) {
+   public Collection 
listResolvedRootSlotInfo(@Nullable AbstractID groupId) {
return resolvedRootSlots
.values()
.stream()
-   .flatMap((Map map) -> 
map.values().stream())
-   .filter((MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId))
-   .map((MultiTaskSlot multiTaskSlot) -> (SlotInfo) 
multiTaskSlot.getSlotContextFuture().join())
-   .collect(Collectors.toList());
+   .flatMap((Map map) 
-> map.values().stream())
+   .filter((MultiTaskSlot multiTaskSlot) -> 
!multiTaskSlot.contains(groupId))
 
 Review comment:
   indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-12732) Add savepoint reader for consuming partitioned operator state

2019-06-25 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-12732.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Thanks for the contribution Seth!

Merged for 1.9.0: 861bf73ada01e4d2d8c671e507974e5bfacd9218

> Add savepoint reader for consuming partitioned operator state
> -
>
> Key: FLINK-12732
> URL: https://issues.apache.org/jira/browse/FLINK-12732
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Runtime / State Backends
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12729) Add savepoint reader for consuming non-partitioned operator state

2019-06-25 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-12729.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged for 1.9.0: 88d2e3cc5c31a7703d1deb90f79cd195f67c9686

> Add savepoint reader for consuming non-partitioned operator state
> -
>
> Key: FLINK-12729
> URL: https://issues.apache.org/jira/browse/FLINK-12729
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Runtime / State Backends
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8709: [hotfix][docs]Fix the typos in process_function.md and process_function.zh.md.

2019-06-25 Thread GitBox
asfgit closed pull request #8709: [hotfix][docs]Fix the typos in 
process_function.md and process_function.zh.md.
URL: https://github.com/apache/flink/pull/8709
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8849: [hotfix][connectors] fix shadowed NPE in elasticsearch sink connector

2019-06-25 Thread GitBox
asfgit closed pull request #8849: [hotfix][connectors] fix shadowed NPE in 
elasticsearch sink connector
URL: https://github.com/apache/flink/pull/8849
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8797: [Docks] Checkpoints: fix typo

2019-06-25 Thread GitBox
asfgit closed pull request #8797: [Docks] Checkpoints: fix typo
URL: https://github.com/apache/flink/pull/8797
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8640: [FLINK-12741][docs] Update Kafka producer fault tolerance guarantees

2019-06-25 Thread GitBox
asfgit closed pull request #8640: [FLINK-12741][docs] Update Kafka producer 
fault tolerance guarantees
URL: https://github.com/apache/flink/pull/8640
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8806: [FLINK-12870][doc] Add missing warnings to schema evolution documentation

2019-06-25 Thread GitBox
asfgit closed pull request #8806: [FLINK-12870][doc] Add missing warnings to 
schema evolution documentation
URL: https://github.com/apache/flink/pull/8806
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8848: [FLINK-12957][docs] fix thrift and protobuf dependency examples

2019-06-25 Thread GitBox
asfgit closed pull request #8848: [FLINK-12957][docs] fix thrift and protobuf 
dependency examples
URL: https://github.com/apache/flink/pull/8848
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-12870) Improve documentation of keys schema evolution

2019-06-25 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-12870.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged for 1.9.0: 641cbed1da37357b8c14a0cec5d33235bcf3a30f

> Improve documentation of keys schema evolution
> --
>
> Key: FLINK-12870
> URL: https://issues.apache.org/jira/browse/FLINK-12870
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There are certain pitfall when modifying classes used as keys.
> For instance, removing a field from a key's class would cause 
> nondeterministic behavior when using RocksDB state backend, which relies on 
> binary objects identity, rather than hashCode implementation.
> Documentation should mention that keys evolution is not supported and 
> specifically that when using Kryo serializers, framework would not be able to 
> capture potential incompatibility with the previous state format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-25 Thread GitBox
asfgit closed pull request #8618:  [FLINK-12732][state-processor-api] Add 
savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8174: [FLINK-12190] Fix IllegalArgumentException thrown by FlinkKinesisConsumerMigrationTest#writeSnapshot

2019-06-25 Thread GitBox
asfgit closed pull request #8174: [FLINK-12190] Fix IllegalArgumentException 
thrown by FlinkKinesisConsumerMigrationTest#writeSnapshot
URL: https://github.com/apache/flink/pull/8174
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297467266
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ##
 @@ -260,6 +265,102 @@ public int compareTo(@Nonnull ResourceProfile other) {
return cmp;
}
 
+   /**
+* Calculates the sum of two resource profiles.
+*
+* @param other The other resource profile to add.
+* @return The merged resource profile.
+*/
+   @Nonnull
+   public ResourceProfile merge(@Nonnull ResourceProfile other) {
+   if (equals(ANY) || other.equals(ANY)) {
+   return ANY;
+   }
+
+   if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) {
+   return UNKNOWN;
+   }
+
+   Map resultExtendedResource = new 
HashMap<>(extendedResources);
+
+   other.extendedResources.forEach((String name, Resource 
resource) -> {
+   resultExtendedResource.compute(name, (ignored, 
oldResource) ->
+   oldResource == null ? resource : 
oldResource.merge(resource));
+   });
+
+   return new ResourceProfile(
+   addDoublesConsideringOverflow(cpuCores, other.cpuCores),
+   addIntegersConsideringOverflow(heapMemoryInMB, 
other.heapMemoryInMB),
+   addIntegersConsideringOverflow(directMemoryInMB, 
other.directMemoryInMB),
+   addIntegersConsideringOverflow(nativeMemoryInMB, 
other.nativeMemoryInMB),
+   addIntegersConsideringOverflow(networkMemoryInMB, 
other.networkMemoryInMB),
+   resultExtendedResource);
+   }
+
+   /**
+* Subtracts another piece of resource profile from this one.
+*
+* @param other The other resource profile to subtract.
+* @return The subtracted resource profile.
+*/
+   public ResourceProfile subtract(ResourceProfile other) {
+   if (equals(ANY) || other.equals(ANY)) {
+   return ANY;
+   }
+
+   if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) {
+   return UNKNOWN;
+   }
+
+   checkArgument(isMatching(other), "Try to subtract an unmatched 
resource profile from this one.");
+
+   Map resultExtendedResource = new 
HashMap<>(extendedResources);
+
+   other.extendedResources.forEach((String name, Resource 
resource) -> {
+   resultExtendedResource.compute(name, (ignored, 
oldResource) -> {
+   Resource resultResource = 
oldResource.subtract(resource);
+   return resultResource.getValue() == 0 ? null : 
resultResource;
+   });
+   });
+
+   return new ResourceProfile(
+   subtractDoublesConsideringInf(cpuCores, other.cpuCores),
+   subtractIntegersConsideringInf(heapMemoryInMB, 
other.heapMemoryInMB),
+   subtractIntegersConsideringInf(directMemoryInMB, 
other.directMemoryInMB),
+   subtractIntegersConsideringInf(nativeMemoryInMB, 
other.nativeMemoryInMB),
+   subtractIntegersConsideringInf(networkMemoryInMB, 
other.networkMemoryInMB),
+   resultExtendedResource
+   );
+   }
+
+   private double addDoublesConsideringOverflow(double first, double 
second) {
+   double result = first + second;
+
+   if (result == Double.POSITIVE_INFINITY) {
+   return Double.MAX_VALUE;
+   }
+
+   return result;
+   }
+
+   private int addIntegersConsideringOverflow(int first, int second) {
+   int result = first + second;
+
+   if (result < 0) {
+   return Integer.MAX_VALUE;
 
 Review comment:
   If we assume both integers to be added are non-negative, we should make this 
assumption explicit. Maybe change the method name to 
addNonNegtiveIntegersConsideringOverflow?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool

2019-06-25 Thread GitBox
xintongsong commented on a change in pull request #8841: 
[FLINK-12765][coordinator] Bookkeeping of available resources of allocated 
slots in SlotPool
URL: https://github.com/apache/flink/pull/8841#discussion_r297472941
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##
 @@ -509,6 +608,22 @@ private void releaseChild(AbstractID childGroupId) {
}
}
 
+   private void onResourceRequested(ResourceProfile 
resourceProfile) {
 
 Review comment:
   requested -> allocated/occupied


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-12190) Fix IllegalArgumentException thrown by FlinkKinesisConsumerMigrationTest#writeSnapshot

2019-06-25 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-12190.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged for 1.9.0: f9681c0d1d12b9120c5f6e8cbc3e81265d4b403a

> Fix IllegalArgumentException thrown by 
> FlinkKinesisConsumerMigrationTest#writeSnapshot
> --
>
> Key: FLINK-12190
> URL: https://issues.apache.org/jira/browse/FLINK-12190
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, {{FlinkKinesisConsumerMigrationTest#writeSnapshot}} throws an 
> exception : 
> {code:java}
> java.lang.IllegalArgumentException: Cannot create enum from null value!
> at com.amazonaws.regions.Regions.fromName(Regions.java:79)
> at 
> org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.createKinesisClient(AWSUtil.java:93)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:203)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:138)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:213)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:275)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:237)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerMigrationTest$TestFetcher.(FlinkKinesisConsumerMigrationTest.java:422)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerMigrationTest.writeSnapshot(FlinkKinesisConsumerMigrationTest.java:332)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumerMigrationTest.writeSnapshot(FlinkKinesisConsumerMigrationTest.java:113)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> This exception may make the upgrader confused.
> The exception is because the exists code did not initialize TestFetcher 
> correctly. More details see the commits : 
> https://issues.apache.org/jira/browse/FLINK-10785?focusedCommentId=16778899=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16778899
>  under FLINK-10785



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state

2019-06-25 Thread GitBox
asfgit closed pull request #8615:  [FLINK-12729][state-processor-api] Add state 
reader for consuming non-partitioned operator state
URL: https://github.com/apache/flink/pull/8615
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-12741) Update docs about Kafka producer fault tolerance guarantees

2019-06-25 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-12741.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged for 1.9.0: bbe47ade83cefefd71f60e204ab43d39b61f9d1b

> Update docs about Kafka producer fault tolerance guarantees
> ---
>
> Key: FLINK-12741
> URL: https://issues.apache.org/jira/browse/FLINK-12741
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.9.0
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Since Flink 1.4.0, we provide exactly-once semantic on Kafka 0.11+, but the 
> document is still not updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12729) Add savepoint reader for consuming non-partitioned operator state

2019-06-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12729:
---
Labels: pull-request-available  (was: )

> Add savepoint reader for consuming non-partitioned operator state
> -
>
> Key: FLINK-12729
> URL: https://issues.apache.org/jira/browse/FLINK-12729
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Runtime / State Backends
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12976) Bump Kafka client version to 2.3.0 for universal Kafka connector

2019-06-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872883#comment-16872883
 ] 

vinoyang edited comment on FLINK-12976 at 6/26/19 3:32 AM:
---

I can make sure that there are compatibility issues when bumping Kafka client 
version from 2.2.0 to 2.3.0

log details: [https://api.travis-ci.org/v3/job/550192082/log.txt]

One of them is the {{FlinkKafkaInternalProducer}} uses reflection to get the 
direct inner {{nextSequence}} field of {{TransactionManager}}.

Before 2.3.0, the type of this field is:
{code:java}
private final Map nextSequence;
{code}
How in 2.3.0, the {{TransactionManager}} changed its implementation, it 
introduced two inner classes, named {{TopicPartitionBookkeeper}} and 
{{TopicPartitionEntry}}. And the {{nextSequence}} is put into 
{{TopicPartitionEntry}}. 
{code:java}
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:262)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:253)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:156)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:752)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:393)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:351)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:870)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:856)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:365)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldException: nextSequence
at java.lang.Class.getDeclaredField(Class.java:2070)
at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.getValue(FlinkKafkaInternalProducer.java:258)
... 16 more
{code}
So we need to change the way of getting this field. I will try to figure it 
out. cc [~aljoscha] and [~pnowojski].


was (Author: yanghua):
I can make sure that there are compatibility issues when bumping Kafka client 
version from 2.2.0 to 2.3.0

log details: [https://api.travis-ci.org/v3/job/550192082/log.txt]

One of them is the {{FlinkKafkaInternalProducer}} uses reflection to get the 
direct inner {{nextSequence}} field of {{TransactionManager}}.

Before 2.3.0, the type of this field is:
{code:java}
private final Map nextSequence;
{code}
How in 2.3.0, the {{TransactionManager}} changed its implementation, it 
introduced two inner classes, named {{TopicPartitionBookkeeper}} and 
{{TopicPartitionEntry}}. And the {{nextSequence}} is put into 
{{TopicPartitionEntry}}. 

So we need to change the way of getting this field. I will try to figure it 
out. cc [~aljoscha] and [~pnowojski].

 

 

> Bump Kafka client version to 2.3.0 for universal Kafka connector
> 
>
> Key: FLINK-12976
> URL: https://issues.apache.org/jira/browse/FLINK-12976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently, Kafka 2.3.0 has released, see here: 
> [https://www.apache.org/dist/kafka/2.3.0/RELEASE_NOTES.html]
> We'd better bump the dependency version of Kafka client to 2.3.0 to track the 
> newest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tzulitai commented on issue #8174: [FLINK-12190] Fix IllegalArgumentException thrown by FlinkKinesisConsumerMigrationTest#writeSnapshot

2019-06-25 Thread GitBox
tzulitai commented on issue #8174: [FLINK-12190] Fix IllegalArgumentException 
thrown by FlinkKinesisConsumerMigrationTest#writeSnapshot
URL: https://github.com/apache/flink/pull/8174#issuecomment-505703526
 
 
   Merging ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12976) Bump Kafka client version to 2.3.0 for universal Kafka connector

2019-06-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872883#comment-16872883
 ] 

vinoyang commented on FLINK-12976:
--

I can make sure that there are compatibility issues when bumping Kafka client 
version from 2.2.0 to 2.3.0

log details: [https://api.travis-ci.org/v3/job/550192082/log.txt]

One of them is the {{FlinkKafkaInternalProducer}} uses reflection to get the 
direct inner {{nextSequence}} field of {{TransactionManager}}.

Before 2.3.0, the type of this field is:
{code:java}
private final Map nextSequence;
{code}
How in 2.3.0, the {{TransactionManager}} changed its implementation, it 
introduced two inner classes, named {{TopicPartitionBookkeeper}} and 
{{TopicPartitionEntry}}. And the {{nextSequence}} is put into 
{{TopicPartitionEntry}}. 

So we need to change the way of getting this field. I will try to figure it 
out. cc [~aljoscha] and [~pnowojski].

 

 

> Bump Kafka client version to 2.3.0 for universal Kafka connector
> 
>
> Key: FLINK-12976
> URL: https://issues.apache.org/jira/browse/FLINK-12976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently, Kafka 2.3.0 has released, see here: 
> [https://www.apache.org/dist/kafka/2.3.0/RELEASE_NOTES.html]
> We'd better bump the dependency version of Kafka client to 2.3.0 to track the 
> newest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zjuwangg commented on issue #8890: Xuefu1

2019-06-25 Thread GitBox
zjuwangg commented on issue #8890: Xuefu1
URL: https://github.com/apache/flink/pull/8890#issuecomment-505702146
 
 
   better to rename the pr title


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466824
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
 
 Review comment:
   The TestAdaptedRestartPipelinedRegionStrategyNG need EG as the constructor 
param.
   So we cannot build it before the EG is generated.


[GitHub] [flink] tzulitai commented on issue #8640: [FLINK-12741][docs] Update Kafka producer fault tolerance guarantees

2019-06-25 Thread GitBox
tzulitai commented on issue #8640: [FLINK-12741][docs] Update Kafka producer 
fault tolerance guarantees
URL: https://github.com/apache/flink/pull/8640#issuecomment-505701524
 
 
   Thanks for the fix and reviews.
   Merging ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297470182
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new 

[GitHub] [flink] tzulitai commented on issue #8709: [hotfix][docs]Fix the typos in process_function.md and process_function.zh.md.

2019-06-25 Thread GitBox
tzulitai commented on issue #8709: [hotfix][docs]Fix the typos in 
process_function.md and process_function.zh.md.
URL: https://github.com/apache/flink/pull/8709#issuecomment-505701130
 
 
   Merging ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8797: [Docks] Checkpoints: fix typo

2019-06-25 Thread GitBox
tzulitai commented on issue #8797: [Docks] Checkpoints: fix typo
URL: https://github.com/apache/flink/pull/8797#issuecomment-505700499
 
 
   Thanks for the fix and reviews, merging ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8806: [FLINK-12870][doc] Add missing warnings to schema evolution documentation

2019-06-25 Thread GitBox
tzulitai commented on issue #8806: [FLINK-12870][doc] Add missing warnings to 
schema evolution documentation
URL: https://github.com/apache/flink/pull/8806#issuecomment-505700140
 
 
   I think the extra warnings is already an improvement to the current state of 
this page.
   +1, will merge this ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465424
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
 
 Review comment:
   Making `testMainThreadUtil` static will cause it to be `null` in tests.
   The Resource is static because it is a ClassRule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8847: [FLINK-12609][python] Align the Python data types with Java

2019-06-25 Thread GitBox
sunjincheng121 commented on issue #8847: [FLINK-12609][python] Align the Python 
data types with Java
URL: https://github.com/apache/flink/pull/8847#issuecomment-505700051
 
 
   Thanks for the PR @dianfu !
   I appreciate if you can open the independent PR for the hotfix change,What 
do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Issue Comment Deleted] (FLINK-12976) Bump Kafka client version to 2.3.0 for universal Kafka connector

2019-06-25 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-12976:
-
Comment: was deleted

(was: It seems the public maven repository has not updated for the newest 
dependency.)

> Bump Kafka client version to 2.3.0 for universal Kafka connector
> 
>
> Key: FLINK-12976
> URL: https://issues.apache.org/jira/browse/FLINK-12976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently, Kafka 2.3.0 has released, see here: 
> [https://www.apache.org/dist/kafka/2.3.0/RELEASE_NOTES.html]
> We'd better bump the dependency version of Kafka client to 2.3.0 to track the 
> newest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tzulitai commented on issue #8848: [FLINK-12957][docs] fix thrift and protobuf dependency examples

2019-06-25 Thread GitBox
tzulitai commented on issue #8848: [FLINK-12957][docs] fix thrift and protobuf 
dependency examples
URL: https://github.com/apache/flink/pull/8848#issuecomment-505699571
 
 
   Thanks for the contribution @NicoK!
   The changes look good, will merge this ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz edited a comment on issue #8890: Xuefu1

2019-06-25 Thread GitBox
xuefuz edited a comment on issue #8890: Xuefu1
URL: https://github.com/apache/flink/pull/8890#issuecomment-505680111
 
 
   Please note that this PR included the changes in PR #8883. cc: @bowenli86 
@lirui-apache 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8849: [hotfix][connectors] fix shadowed NPE in elasticsearch sink connector

2019-06-25 Thread GitBox
flinkbot edited a comment on issue #8849: [hotfix][connectors] fix shadowed NPE 
in elasticsearch sink connector
URL: https://github.com/apache/flink/pull/8849#issuecomment-504907532
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tzulitai [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tzulitai [PMC]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @tzulitai [PMC]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @tzulitai [PMC]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8849: [hotfix][connectors] fix shadowed NPE in elasticsearch sink connector

2019-06-25 Thread GitBox
tzulitai commented on issue #8849: [hotfix][connectors] fix shadowed NPE in 
elasticsearch sink connector
URL: https://github.com/apache/flink/pull/8849#issuecomment-505698528
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8849: [hotfix][connectors] fix shadowed NPE in elasticsearch sink connector

2019-06-25 Thread GitBox
tzulitai commented on issue #8849: [hotfix][connectors] fix shadowed NPE in 
elasticsearch sink connector
URL: https://github.com/apache/flink/pull/8849#issuecomment-505698475
 
 
   Thanks for this fix, merging ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-25 Thread GitBox
flinkbot edited a comment on issue #8618:  [FLINK-12732][state-processor-api] 
Add savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618#issuecomment-498777998
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tzulitai [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tzulitai [PMC]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @tzulitai [PMC]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @tzulitai [PMC]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @tzulitai [PMC]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-25 Thread GitBox
tzulitai commented on issue #8618:  [FLINK-12732][state-processor-api] Add 
savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618#issuecomment-505697948
 
 
   Merging ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state

2019-06-25 Thread GitBox
tzulitai commented on issue #8618:  [FLINK-12732][state-processor-api] Add 
savepoint reader for consuming partitioned operator state
URL: https://github.com/apache/flink/pull/8618#issuecomment-505697913
 
 
   @flinkbot approve architecture
   @flinkbot approve quality


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466824
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
 
 Review comment:
   The TestAdaptedRestartPipelinedRegionStrategyNG need EG as the constructor 
param.



[GitHub] [flink] lirui-apache commented on issue #8859: [FLINK-12905][table-planner] Enable querying CatalogViews in legacy planner

2019-06-25 Thread GitBox
lirui-apache commented on issue #8859: [FLINK-12905][table-planner] Enable 
querying CatalogViews in legacy planner
URL: https://github.com/apache/flink/pull/8859#issuecomment-505697061
 
 
   For Hive to access (non-generic) views created by Flink, we need to 1) 
remove the catalog part from the object paths and 2) make sure the expanded 
query is valid HiveQL syntax.
   To achieve the above in Blink, we used Hive's analyzer to analyze the 
original query in order to get a Hive-compatible expanded query. Blink's 
expanded query is stored as a property of the view, and will be restored if the 
view is accessed from Blink side.
   
   If we're to do the same in Flink, I guess it can be done in a separate PR. 
And the prerequisite is that `CatalogView` preserves the original query when it 
reaches `HiveCatalog::createTable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466312
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Records modifications of
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex 
ExecutionVertices}, and allows
+ * for checking whether a vertex was modified.
+ *
+ * Examples for modifications include:
+ * 
+ * cancellation of the underlying execution
+ * deployment of the execution vertex
+ * 
+ *
+ * @see DefaultScheduler
+ */
+public class ExecutionVertexVersioner {
 
 Review comment:
   Yes. Will add tests for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12972) Ensure calling open/close in join condition of generated functions for blink

2019-06-25 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12972.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

merged in 1.9.0: 57603790e7e5ed3b7530e26f28d8b6dfe20d6422

> Ensure calling open/close in join condition of generated functions for blink
> 
>
> Key: FLINK-12972
> URL: https://issues.apache.org/jira/browse/FLINK-12972
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Generated functions' *open* method are not called in join operator. e.g. 
> *AbstractStreamingJoinOperator*, *SortMergeJoinOperator*, *HashJoinOperator, 
> LongHashJoinGenerator*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297466134
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new 

[GitHub] [flink] KurtYoung merged pull request #8868: [FLINK-12972][table-planner-blink] Ensure calling open/close in join condition of generated functions for blink

2019-06-25 Thread GitBox
KurtYoung merged pull request #8868: [FLINK-12972][table-planner-blink] Ensure 
calling open/close in join condition of generated functions for blink
URL: https://github.com/apache/flink/pull/8868
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465694
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
+   EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final JobID jid = new JobID();
+   final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, DEFAULT_PARALLELISM);
+   final TestRestartStrategy restartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   final ExecutionGraph eg = createExecutionGraph(
+   jid,
+   TestAdaptedRestartPipelinedRegionStrategyNG::new,
+   restartStrategy,
+   slotProvider);
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new 

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465424
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   public final int DEFAULT_PARALLELISM = 2;
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new TestingComponentMainThreadExecutor.Resource();
+
+   private final TestingComponentMainThreadExecutor testMainThreadUtil =
 
 Review comment:
   Ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297465153
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/runtime/RegionFailoverITCase.java
 ##
 @@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for testing region failover strategy.
+ */
+public class RegionFailoverITCase extends TestLogger {
 
 Review comment:
   I take it as E2E tests that ensures the job can succeed even when task 
failures happens when using region failover strategy.
   IT cases do blackbox verifications so it does not verify internal 
states(like which tasks are re-scheduled).
   It just gives us confidence that Flink runtime is working well in this 
scenario.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-25 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297463953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -63,7 +63,7 @@
return new RestartAllStrategy.Factory();
 
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
 
 Review comment:
   Ok. Let's make the adapted region failover strategy default in another PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #8878: [FLINK-12971][table-planner-blink] Remove the constraint that lookup join needs a primary key or index key

2019-06-25 Thread GitBox
godfreyhe commented on issue #8878: [FLINK-12971][table-planner-blink] Remove 
the constraint that lookup join needs a primary key or index key
URL: https://github.com/apache/flink/pull/8878#issuecomment-505693275
 
 
   thanks for this refactoring @wuchong , LGTM now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-25 Thread GitBox
zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-505692836
 
 
   Thanks for further reviews @azagrebin ! 
   I have rebased the codes and addressed the new comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-06-25 Thread Yingjie Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872870#comment-16872870
 ] 

Yingjie Cao commented on FLINK-12070:
-

[~StephanEwen] Though, theoretically, two read buffers is enough. But there 
could be still problems. Your concern is reasonable, indeed, the old 
"SpillableSubpartition" incurs the same problem, but shows different behavior 
which is deadlock (I fix the deadlock problem when I test the old 
implementation), because the buffer request of the old "SpillableSubpartition" 
is blocking and when there is no enough read buffer, the Netty thread is 
blocked instead of throwing an exception.

The reader can be scheduled not only when the buffer of same reader is sent out 
but also when the downstream adding credit or when buffers of other readers 
multiplex the same channel are sent out. So it is unsafe to make the assumption 
that when the reader is scheduled there must be a buffer for reading ahead.

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on a change in pull request #8878: [FLINK-12971][table-planner-blink] Remove the constraint that lookup join needs a primary key or index key

2019-06-25 Thread GitBox
godfreyhe commented on a change in pull request #8878: 
[FLINK-12971][table-planner-blink] Remove the constraint that lookup join needs 
a primary key or index key
URL: https://github.com/apache/flink/pull/8878#discussion_r297461744
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
 ##
 @@ -85,19 +84,14 @@ abstract class CommonLookupJoin(
   with FlinkRelNode {
 
   val joinKeyPairs: Array[IntPair] = getTemporalTableJoinKeyPairs(joinInfo, 
calcOnTemporalTable)
-  val indexKeys: Array[TableIndex] = getTableIndexes(tableSource)
   // all potential index keys, mapping from field index in table source to 
LookupKey
   val allLookupKeys: Map[Int, LookupKey] = analyzeLookupKeys(
 cluster.getRexBuilder,
 joinKeyPairs,
-indexKeys,
 tableSource.getTableSchema,
 calcOnTemporalTable)
   // the matched best lookup fields which is in defined order, maybe empty
-  val matchedLookupFields: Option[Array[Int]] = findMatchedIndex(
-indexKeys,
-tableSource.getTableSchema,
-allLookupKeys)
+  val matchedLookupFields: Option[Array[Int]] = findMatchedIndex(allLookupKeys)
 
 Review comment:
   matchedLookupFields could be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord

2019-06-25 Thread GitBox
becketqin commented on issue #8535: [FLINK-11693] Add KafkaSerializationSchema 
that uses ProducerRecord
URL: https://github.com/apache/flink/pull/8535#issuecomment-505687127
 
 
   @aljoscha Yes, you are right. We are not able to change the partition once 
the `ProducerRecrod` is there. We will have to create another `ProducerRecord` 
in order to do that, but that will result in some overhead. Introducing a 
context sounds reasonable to me. This has an extra benefit of allowing context 
aware serialization. There might be two flavors of doing this. 
   
   1. Pass the context as a method argument.
   2. A `ContextAware` interface like `RichFunction`, with a `setContext()` and 
`getContext()` method.
   
   Personally I slightly prefer flavor 2, because it avoid keeping the Context 
object outside of the class which actually uses it. But I don't have a strong 
opinion on this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8890: Xuefu1

2019-06-25 Thread GitBox
flinkbot commented on issue #8890: Xuefu1
URL: https://github.com/apache/flink/pull/8890#issuecomment-505680303
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8890: Xuefu1

2019-06-25 Thread GitBox
xuefuz commented on issue #8890: Xuefu1
URL: https://github.com/apache/flink/pull/8890#issuecomment-505680111
 
 
   Please note that this PR included the changes in PR #8883. cc: @bowenli86 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz opened a new pull request #8890: Xuefu1

2019-06-25 Thread GitBox
xuefuz opened a new pull request #8890: Xuefu1
URL: https://github.com/apache/flink/pull/8890
 
 
   
   
   ## What is the purpose of the change
   
   Add the generation of HiveTableSink from from a Hive table
   
   ## Brief change log
   
   *(for example:)*
 - Added the logic of generation of HiveTableSink from from a Hive table
 - Refactored and simplified HiveTableSink related code
 - Added test for the logic
   
   ## Verifying this change
   
   This change is verified with existing and new test cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yno)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (yes )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8868: [FLINK-12972][table-planner-blink] Ensure calling open/close in join condition of generated functions for blink

2019-06-25 Thread GitBox
godfreyhe commented on a change in pull request #8868: 
[FLINK-12972][table-planner-blink] Ensure calling open/close in join condition 
of generated functions for blink
URL: https://github.com/apache/flink/pull/8868#discussion_r297452220
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ##
 @@ -1106,4 +1108,23 @@ class JoinITCase(state: StateBackendMode) extends 
StreamingWithStateTestBase(sta
 val expected = List("500")
 assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
+
+  @Test
+  def testJoinWithUDFFilter(): Unit = {
+val ds1 = failingDataSource(TestData.smallTupleData3).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = failingDataSource(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 
'g, 'h)
+
+tEnv.registerTable("T3", ds1)
+tEnv.registerTable("T5", ds2)
+tEnv.registerFunction("funcWithOpen", new FuncWithOpen)
+
+val sql = "SELECT c, g FROM T3 join T5 on funcWithOpen(a + d) where b = e"
 
 Review comment:
   @KurtYoung , `a` is from join left side (`T3`), while `d` is from join right 
side (`T5`).  `funcWithOpen(a + d)` could not be pushed into left side or right 
side as a `Project`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12989) Generate HiveTableSink from from a Hive table

2019-06-25 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated FLINK-12989:

Summary: Generate HiveTableSink from from a Hive table  (was: Add 
implementation of converting Hive catalog table to Hive table sink)

> Generate HiveTableSink from from a Hive table
> -
>
> Key: FLINK-12989
> URL: https://issues.apache.org/jira/browse/FLINK-12989
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> As a followup for FLINK-11480, this adds the conversion from a Hive table to 
> a table sink that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12989) Add implementation of converting Hive catalog table to Hive table sink

2019-06-25 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-12989:
---

 Summary: Add implementation of converting Hive catalog table to 
Hive table sink
 Key: FLINK-12989
 URL: https://issues.apache.org/jira/browse/FLINK-12989
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang


As a followup for FLINK-11480, this adds the conversion from a Hive table to a 
table sink that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-25 Thread GitBox
flinkbot commented on issue #8889: [FLINK-12934][hive] add additional 
dependencies for flink-connector-hive to connect to remote hive metastore 
service
URL: https://github.com/apache/flink/pull/8889#issuecomment-505662827
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12934) add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12934:
---
Labels: pull-request-available  (was: )

> add additional dependencies for flink-connector-hive to connect to remote 
> hive metastore service
> 
>
> Key: FLINK-12934
> URL: https://issues.apache.org/jira/browse/FLINK-12934
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> we've been testing flink-connector-hive to connect to built-in hive 
> metastore. Connecting to standalone hive metastore requires additional 
> dependencies:
> {code:java}
> commons-configuration-1.6.jar
> commons-logging-1.2.jar
> servlet-api-2.5.jar
> hadoop-auth-2.7.2.jar
> {code}
> All dependencies needed to make flink-connector-hive able to connect to hive 
> metastore for HiveCatalog include:
> {code:java}
> commons-configuration-1.6.jar hadoop-common-2.7.2.jar
> commons-logging-1.2.jar   
> hadoop-mapreduce-client-core-2.7.2.jar
> flink-connector-hive_2.11-1.9-SNAPSHOT.jarhive-exec-2.3.4.jar
> flink-dist_2.11-1.9-SNAPSHOT.jar  hive-metastore-2.3.4.jar
> flink-hadoop-compatibility_2.11-1.9-SNAPSHOT.jar  log4j-1.2.17.jar
> flink-sql-client_2.11-1.9-SNAPSHOT.jar
> servlet-api-2.5.jar
> flink-table_2.11-1.9-SNAPSHOT.jar slf4j-log4j12-1.7.15.jar
> hadoop-auth-2.7.2.jar
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-25 Thread GitBox
bowenli86 opened a new pull request #8889: [FLINK-12934][hive] add additional 
dependencies for flink-connector-hive to connect to remote hive metastore 
service
URL: https://github.com/apache/flink/pull/8889
 
 
   ## What is the purpose of the change
   
   This PR adds a few more dependencies for HiveCatalog in flink-connector-hive 
to connect to a remote hive metastore service.
   
   We've been testing flink-connector-hive to connect to built-in hive 
metastore. Connecting to remote hive metastore requires additional dependencies:
   
   - commons-configuration-1.6.jar
   - commons-logging-1.2.jar
   - servlet-api-2.5.jar
   - hadoop-auth-2.7.2.jar
   
   ## Brief change log
   
   - adds a few more dependencies for HiveCatalog in flink-connector-hive to 
connect to a remote hive metastore service
   
   ## Verifying this change
   
   We need end-to-end test for such work.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12934) add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-25 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12934:
-
Summary: add additional dependencies for flink-connector-hive to connect to 
remote hive metastore service  (was: add additional dependencies for 
flink-connector-hive to connect to standalone hive metastore)

> add additional dependencies for flink-connector-hive to connect to remote 
> hive metastore service
> 
>
> Key: FLINK-12934
> URL: https://issues.apache.org/jira/browse/FLINK-12934
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> we've been testing flink-connector-hive to connect to built-in hive 
> metastore. Connecting to standalone hive metastore requires additional 
> dependencies:
> {code:java}
> commons-configuration-1.6.jar
> commons-logging-1.2.jar
> servlet-api-2.5.jar
> hadoop-auth-2.7.2.jar
> {code}
> All dependencies needed to make flink-connector-hive able to connect to hive 
> metastore for HiveCatalog include:
> {code:java}
> commons-configuration-1.6.jar hadoop-common-2.7.2.jar
> commons-logging-1.2.jar   
> hadoop-mapreduce-client-core-2.7.2.jar
> flink-connector-hive_2.11-1.9-SNAPSHOT.jarhive-exec-2.3.4.jar
> flink-dist_2.11-1.9-SNAPSHOT.jar  hive-metastore-2.3.4.jar
> flink-hadoop-compatibility_2.11-1.9-SNAPSHOT.jar  log4j-1.2.17.jar
> flink-sql-client_2.11-1.9-SNAPSHOT.jar
> servlet-api-2.5.jar
> flink-table_2.11-1.9-SNAPSHOT.jar slf4j-log4j12-1.7.15.jar
> hadoop-auth-2.7.2.jar
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   6   >