[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284687035
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 ##
 @@ -18,131 +18,112 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.xml.ws.Provider;
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for {@link DefaultExecutionVertex}.
  */
-public class DefaultExecutionVertexTest {
-
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+public class DefaultExecutionVertexTest extends TestLogger {
 
-   private final int parallelism = 3;
+   private final ExecutionStateProviderTest stateProvider = new 
ExecutionStateProviderTest();
 
private List schedulingExecutionVertices;
 
-   private List executionVertices;
+   private IntermediateResultPartitionID intermediateResultPartitionId;
 
@Before
public void setUp() throws Exception {
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   ExecutionGraph executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   ExecutionGraphToSchedulingTopologyAdapter adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   schedulingExecutionVertices = new ArrayList<>();
-   adapter.getVertices().forEach(vertex -> 
schedulingExecutionVertices.add(vertex));
-   executionVertices = new ArrayList<>();
-   executionGraph.getAllExecutionVertices().forEach(vertex -> 
executionVertices.add(vertex));
-   }
 
-   @Test
-   public void testGetId() {
-   for (int idx = 0; idx < schedulingExecutionVertices.size(); 
idx++){
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(),
-   executionVertices.get(idx).getJobvertexId());
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(),
-   
executionVertices.get(idx).getParallelSubtaskIndex());
-   }
+   

[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284689769
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 ##
 @@ -18,131 +18,112 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.xml.ws.Provider;
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for {@link DefaultExecutionVertex}.
  */
-public class DefaultExecutionVertexTest {
-
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+public class DefaultExecutionVertexTest extends TestLogger {
 
-   private final int parallelism = 3;
+   private final ExecutionStateProviderTest stateProvider = new 
ExecutionStateProviderTest();
 
private List schedulingExecutionVertices;
 
-   private List executionVertices;
+   private IntermediateResultPartitionID intermediateResultPartitionId;
 
@Before
public void setUp() throws Exception {
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   ExecutionGraph executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   ExecutionGraphToSchedulingTopologyAdapter adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   schedulingExecutionVertices = new ArrayList<>();
-   adapter.getVertices().forEach(vertex -> 
schedulingExecutionVertices.add(vertex));
-   executionVertices = new ArrayList<>();
-   executionGraph.getAllExecutionVertices().forEach(vertex -> 
executionVertices.add(vertex));
-   }
 
-   @Test
-   public void testGetId() {
-   for (int idx = 0; idx < schedulingExecutionVertices.size(); 
idx++){
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(),
-   executionVertices.get(idx).getJobvertexId());
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(),
-   
executionVertices.get(idx).getParallelSubtaskIndex());
-   }
+   

[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284697765
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
 ##
 @@ -62,121 +49,73 @@
 /**
  * Unit tests for {@link DefaultResultPartition}.
  */
-public class DefaultResultPartitionTest {
+public class DefaultResultPartitionTest extends TestLogger {
 
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+   private final DefaultExecutionVertexTest.ExecutionStateProviderTest 
stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest();
 
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   private List schedulingExecutionVertices;
 
-   private ExecutionGraph executionGraph;
-
-   private ExecutionGraphToSchedulingTopologyAdapter adapter;
-
-   private List intermediateResultPartitions;
-
-   private List schedulingResultPartitions;
+   private DefaultResultPartition resultPartition;
 
@Before
-   public void setUp() throws Exception {
-   final int parallelism = 3;
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   intermediateResultPartitions = new ArrayList<>();
-   schedulingResultPartitions = new ArrayList<>();
-
-   for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-   for (Map.Entry entry
-   : vertex.getProducedPartitions().entrySet()) {
-   
intermediateResultPartitions.add(entry.getValue());
-   
schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey())
-   .orElseThrow(() -> new 
IllegalArgumentException("can not find partition" + entry.getKey(;
-   }
-   }
-   assertEquals(parallelism, intermediateResultPartitions.size());
-   }
-
-   @Test
-   public void testBasicInfo() {
-   for (int idx = 0; idx < intermediateResultPartitions.size(); 
idx++) {
-   final IntermediateResultPartition partition = 
intermediateResultPartitions.get(idx);
-   final SchedulingResultPartition 
schedulingResultPartition = schedulingResultPartitions.get(idx);
-   assertEquals(partition.getPartitionId(), 
schedulingResultPartition.getId());
-   assertEquals(partition.getIntermediateResult().getId(), 
schedulingResultPartition.getResultId());
-   assertEquals(partition.getResultType(), 
schedulingResultPartition.getPartitionType());
-   }
+   public void setUp() {
+   schedulingExecutionVertices = new ArrayList<>(2);
+   resultPartition = new DefaultResultPartition(
+   new IntermediateResultPartitionID(),
+   new IntermediateDataSetID(),
+   BLOCKING);
+
+   DefaultExecutionVertex vertex1 = new DefaultExecutionVertex(
+   new ExecutionVertexID(new JobVertexID(), 0),
+   Collections.singletonList(resultPartition),
+   ALL,
+   stateProvider);
+   resultPartition.setProducer(vertex1);
+   DefaultExecutionVertex vertex2 = new DefaultExecutionVertex(
+   new ExecutionVertexID(new JobVertexID(), 0),
+   java.util.Collections.emptyList(),
+   ALL,
+   stateProvider);
+   resultPartition.addConsumer(vertex2);
+   schedulingExecutionVertices.add(vertex1);
+   schedulingExecutionVertices.add(vertex2);
}
 
@Test
public void testGetConsumers() {
-   for (int idx = 0; idx < intermediateResultPartitions.size(); 
idx++) {
-   Collection schedulingConsumers = 
schedulingResultPartitions.get(idx).getConsumers()
-   

[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284695496
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 ##
 @@ -18,131 +18,112 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.xml.ws.Provider;
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for {@link DefaultExecutionVertex}.
  */
-public class DefaultExecutionVertexTest {
-
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+public class DefaultExecutionVertexTest extends TestLogger {
 
-   private final int parallelism = 3;
+   private final ExecutionStateProviderTest stateProvider = new 
ExecutionStateProviderTest();
 
private List schedulingExecutionVertices;
 
-   private List executionVertices;
+   private IntermediateResultPartitionID intermediateResultPartitionId;
 
@Before
public void setUp() throws Exception {
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   ExecutionGraph executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   ExecutionGraphToSchedulingTopologyAdapter adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   schedulingExecutionVertices = new ArrayList<>();
-   adapter.getVertices().forEach(vertex -> 
schedulingExecutionVertices.add(vertex));
-   executionVertices = new ArrayList<>();
-   executionGraph.getAllExecutionVertices().forEach(vertex -> 
executionVertices.add(vertex));
-   }
 
-   @Test
-   public void testGetId() {
-   for (int idx = 0; idx < schedulingExecutionVertices.size(); 
idx++){
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(),
-   executionVertices.get(idx).getJobvertexId());
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(),
-   
executionVertices.get(idx).getParallelSubtaskIndex());
-   }
+   

[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284689364
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 ##
 @@ -18,131 +18,112 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.xml.ws.Provider;
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for {@link DefaultExecutionVertex}.
  */
-public class DefaultExecutionVertexTest {
-
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+public class DefaultExecutionVertexTest extends TestLogger {
 
-   private final int parallelism = 3;
+   private final ExecutionStateProviderTest stateProvider = new 
ExecutionStateProviderTest();
 
private List schedulingExecutionVertices;
 
-   private List executionVertices;
+   private IntermediateResultPartitionID intermediateResultPartitionId;
 
@Before
public void setUp() throws Exception {
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   ExecutionGraph executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   ExecutionGraphToSchedulingTopologyAdapter adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   schedulingExecutionVertices = new ArrayList<>();
-   adapter.getVertices().forEach(vertex -> 
schedulingExecutionVertices.add(vertex));
-   executionVertices = new ArrayList<>();
-   executionGraph.getAllExecutionVertices().forEach(vertex -> 
executionVertices.add(vertex));
-   }
 
-   @Test
-   public void testGetId() {
-   for (int idx = 0; idx < schedulingExecutionVertices.size(); 
idx++){
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(),
-   executionVertices.get(idx).getJobvertexId());
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(),
-   
executionVertices.get(idx).getParallelSubtaskIndex());
-   }
+   

[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284684706
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
 ##
 @@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adapter;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import javax.xml.ws.Provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link SchedulingExecutionVertex}.
+ */
+public class DefaultExecutionVertex implements SchedulingExecutionVertex {
+
+   private final ExecutionVertexID executionVertexId;
+
+   private final List consumedPartitions;
+
+   private final List producedPartitions;
+
+   private final InputDependencyConstraint inputDependencyConstraint;
+
+   private final Provider stateProvider;
 
 Review comment:
   ah, I meant `Supplier`, my bad


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284689201
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 ##
 @@ -18,131 +18,112 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.xml.ws.Provider;
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for {@link DefaultExecutionVertex}.
  */
-public class DefaultExecutionVertexTest {
-
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+public class DefaultExecutionVertexTest extends TestLogger {
 
-   private final int parallelism = 3;
+   private final ExecutionStateProviderTest stateProvider = new 
ExecutionStateProviderTest();
 
private List schedulingExecutionVertices;
 
-   private List executionVertices;
+   private IntermediateResultPartitionID intermediateResultPartitionId;
 
@Before
public void setUp() throws Exception {
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   ExecutionGraph executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   ExecutionGraphToSchedulingTopologyAdapter adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   schedulingExecutionVertices = new ArrayList<>();
-   adapter.getVertices().forEach(vertex -> 
schedulingExecutionVertices.add(vertex));
-   executionVertices = new ArrayList<>();
-   executionGraph.getAllExecutionVertices().forEach(vertex -> 
executionVertices.add(vertex));
-   }
 
-   @Test
-   public void testGetId() {
-   for (int idx = 0; idx < schedulingExecutionVertices.size(); 
idx++){
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(),
-   executionVertices.get(idx).getJobvertexId());
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(),
-   
executionVertices.get(idx).getParallelSubtaskIndex());
-   }
+   

[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284697592
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
 ##
 @@ -62,121 +49,73 @@
 /**
  * Unit tests for {@link DefaultResultPartition}.
  */
-public class DefaultResultPartitionTest {
+public class DefaultResultPartitionTest extends TestLogger {
 
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+   private final DefaultExecutionVertexTest.ExecutionStateProviderTest 
stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest();
 
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   private List schedulingExecutionVertices;
 
-   private ExecutionGraph executionGraph;
-
-   private ExecutionGraphToSchedulingTopologyAdapter adapter;
-
-   private List intermediateResultPartitions;
-
-   private List schedulingResultPartitions;
+   private DefaultResultPartition resultPartition;
 
@Before
-   public void setUp() throws Exception {
-   final int parallelism = 3;
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   intermediateResultPartitions = new ArrayList<>();
-   schedulingResultPartitions = new ArrayList<>();
-
-   for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-   for (Map.Entry entry
-   : vertex.getProducedPartitions().entrySet()) {
-   
intermediateResultPartitions.add(entry.getValue());
-   
schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey())
-   .orElseThrow(() -> new 
IllegalArgumentException("can not find partition" + entry.getKey(;
-   }
-   }
-   assertEquals(parallelism, intermediateResultPartitions.size());
-   }
-
-   @Test
-   public void testBasicInfo() {
-   for (int idx = 0; idx < intermediateResultPartitions.size(); 
idx++) {
-   final IntermediateResultPartition partition = 
intermediateResultPartitions.get(idx);
-   final SchedulingResultPartition 
schedulingResultPartition = schedulingResultPartitions.get(idx);
-   assertEquals(partition.getPartitionId(), 
schedulingResultPartition.getId());
-   assertEquals(partition.getIntermediateResult().getId(), 
schedulingResultPartition.getResultId());
-   assertEquals(partition.getResultType(), 
schedulingResultPartition.getPartitionType());
-   }
+   public void setUp() {
+   schedulingExecutionVertices = new ArrayList<>(2);
 
 Review comment:
   we never iterate over the array, so we could just have 2 fields (producer 
and consumer), is a bit easier to read


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284695717
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 ##
 @@ -18,131 +18,112 @@
 
 package org.apache.flink.runtime.scheduler.adapter;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.xml.ws.Provider;
+
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
-import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for {@link DefaultExecutionVertex}.
  */
-public class DefaultExecutionVertexTest {
-
-   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-
-   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+public class DefaultExecutionVertexTest extends TestLogger {
 
-   private final int parallelism = 3;
+   private final ExecutionStateProviderTest stateProvider = new 
ExecutionStateProviderTest();
 
private List schedulingExecutionVertices;
 
-   private List executionVertices;
+   private IntermediateResultPartitionID intermediateResultPartitionId;
 
@Before
public void setUp() throws Exception {
-   JobVertex[] jobVertices = new JobVertex[2];
-   jobVertices[0] = createNoOpVertex(parallelism);
-   jobVertices[1] = createNoOpVertex(parallelism);
-   jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-   jobVertices[0].setInputDependencyConstraint(ALL);
-   jobVertices[1].setInputDependencyConstraint(ANY);
-   ExecutionGraph executionGraph = createSimpleTestGraph(
-   new JobID(),
-   taskManagerGateway,
-   triggeredRestartStrategy,
-   jobVertices);
-   ExecutionGraphToSchedulingTopologyAdapter adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-   schedulingExecutionVertices = new ArrayList<>();
-   adapter.getVertices().forEach(vertex -> 
schedulingExecutionVertices.add(vertex));
-   executionVertices = new ArrayList<>();
-   executionGraph.getAllExecutionVertices().forEach(vertex -> 
executionVertices.add(vertex));
-   }
 
-   @Test
-   public void testGetId() {
-   for (int idx = 0; idx < schedulingExecutionVertices.size(); 
idx++){
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(),
-   executionVertices.get(idx).getJobvertexId());
-   
assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(),
-   
executionVertices.get(idx).getParallelSubtaskIndex());
-   }
+   

[jira] [Assigned] (FLINK-11560) Translate "Flink Applications" page into Chinese

2019-05-16 Thread Zhou Yumin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhou Yumin reassigned FLINK-11560:
--

Assignee: Zhou Yumin

> Translate "Flink Applications" page into Chinese
> 
>
> Key: FLINK-11560
> URL: https://issues.apache.org/jira/browse/FLINK-11560
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Zhou Yumin
>Priority: Major
>
> Translate "Flink Applications" page into Chinese.
> The markdown file is located in: flink-web/flink-applications.zh.md
> The url link is: https://flink.apache.org/zh/flink-applications.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284602469
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
 ##
 @@ -30,37 +38,245 @@
 public class RestartPipelinedRegionStrategyTest extends TestLogger {
 
 Review comment:
   I'll add tests for them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284684306
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -165,6 +181,33 @@ private void buildOneRegionForAllVertices() {
for (FailoverVertex vertex : topology.getFailoverVertices()) {
vertexToRegionMap.put(vertex.getExecutionVertexID(), 
region);
}
+
+   buildRegionInputsAndOutputs();
+   }
+
+   private void buildRegionInputsAndOutputs() {
+   for (FailoverRegion region : regions.keySet()) {
+   IdentityHashMap consumers = new 
IdentityHashMap<>();
+   Set inputs = new 
HashSet<>();
+   Set consumerVertices = new 
HashSet<>();
+   Set regionVertices = 
region.getAllExecutionVertices();
+   regionVertices.forEach(v -> {
 
 Review comment:
   I tried the merged logic and find the region building becomes even slower.
   I think this is due to the merged logic saved some iteration overhead, but 
added some more work, including building some legacy relations and later 
deprecating them in region merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on issue #8401: [FLINK-12407][python] Add all table operators 
align Java Table API.
URL: https://github.com/apache/flink/pull/8401#issuecomment-493052179
 
 
   @WeiZhong94 Thanks a lot for the updated. LGTM. +1 to merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable

2019-05-16 Thread GitBox
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284599683
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -57,7 +66,9 @@
 */
public RestartPipelinedRegionStrategy(FailoverTopology topology) {
this.topology = checkNotNull(topology);
-   this.regions = new HashMap<>();
+   this.regions = new IdentityHashMap<>();
+   this.vertexToRegionMap = new HashMap<>();
+   this.resultPartitionAvailabilityChecker = new 
RegionFailoverResultPartitionAvailabilityChecker();
 
 Review comment:
   Good idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu edited a comment on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu edited a comment on issue #8401: [FLINK-12407][python] Add all table 
operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#issuecomment-493052179
 
 
   @WeiZhong94 Thanks a lot for the update. LGTM. +1 to merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12512) TableSourceTest#testNestedProject test failed

2019-05-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12512:
-
Priority: Critical  (was: Major)

> TableSourceTest#testNestedProject test failed
> -
>
> Key: FLINK-12512
> URL: https://issues.apache.org/jira/browse/FLINK-12512
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: vinoyang
>Priority: Critical
>
>  
> {code:java}
> 20:41:59.128 [ERROR] 
> testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest)  
> Time elapsed: 0.047 s  <<< FAILURE!
> org.junit.ComparisonFailure: 
> null expected:<...deepNested.nested2.f[lag AS nestedFlag, 
> deepNested.nested2.num AS nestedNum])
> StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
> source=[TestSource(read nested fields: id.*, deepNested.nested2.num, 
> deepNested.nested2.flag], deepNested.nested1...> but 
> was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS 
> nestedNum])
> StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
> source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, 
> deepNested.nested2.f0], deepNested.nested1...>
>   at 
> org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375)
> {code}
> log details : [https://api.travis-ci.org/v3/job/532319575/log.txt]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12512) TableSourceTest#testNestedProject test failed

2019-05-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12512:
-
Issue Type: Bug  (was: Improvement)

> TableSourceTest#testNestedProject test failed
> -
>
> Key: FLINK-12512
> URL: https://issues.apache.org/jira/browse/FLINK-12512
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: vinoyang
>Priority: Blocker
>
>  
> {code:java}
> 20:41:59.128 [ERROR] 
> testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest)  
> Time elapsed: 0.047 s  <<< FAILURE!
> org.junit.ComparisonFailure: 
> null expected:<...deepNested.nested2.f[lag AS nestedFlag, 
> deepNested.nested2.num AS nestedNum])
> StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
> source=[TestSource(read nested fields: id.*, deepNested.nested2.num, 
> deepNested.nested2.flag], deepNested.nested1...> but 
> was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS 
> nestedNum])
> StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
> source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, 
> deepNested.nested2.f0], deepNested.nested1...>
>   at 
> org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375)
> {code}
> log details : [https://api.travis-ci.org/v3/job/532319575/log.txt]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12512) TableSourceTest#testNestedProject test failed

2019-05-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12512:
-
Priority: Blocker  (was: Critical)

> TableSourceTest#testNestedProject test failed
> -
>
> Key: FLINK-12512
> URL: https://issues.apache.org/jira/browse/FLINK-12512
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: vinoyang
>Priority: Blocker
>
>  
> {code:java}
> 20:41:59.128 [ERROR] 
> testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest)  
> Time elapsed: 0.047 s  <<< FAILURE!
> org.junit.ComparisonFailure: 
> null expected:<...deepNested.nested2.f[lag AS nestedFlag, 
> deepNested.nested2.num AS nestedNum])
> StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
> source=[TestSource(read nested fields: id.*, deepNested.nested2.num, 
> deepNested.nested2.flag], deepNested.nested1...> but 
> was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS 
> nestedNum])
> StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], 
> source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, 
> deepNested.nested2.f0], deepNested.nested1...>
>   at 
> org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375)
> {code}
> log details : [https://api.travis-ci.org/v3/job/532319575/log.txt]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284682569
 
 

 ##
 File path: flink-python/pyflink/table/table_environment.py
 ##
 @@ -153,9 +240,51 @@ def __init__(self, j_tenv):
 self._j_tenv = j_tenv
 super(StreamTableEnvironment, self).__init__(j_tenv)
 
+def get_config(self):
+"""
+Returns the table config to define the runtime behavior of the Table 
API.
+
+:return: Current :class:`TableConfig`.
+"""
+table_config = TableConfig()
+table_config._j_table_config = self._j_tenv.getConfig()
+table_config.is_stream = True
 
 Review comment:
   I think we'd better keep those set method but set them to private, because 
without them we need to put those setters' logic in TableConfig.Builder. I 
think it makes more sense to keep these logic in TableConfig.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-16 Thread GitBox
flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce 
the test cost for Python API
URL: https://github.com/apache/flink/pull/8465#issuecomment-493037224
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @zentol [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-16 Thread GitBox
sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce 
the test cost for Python API
URL: https://github.com/apache/flink/pull/8465#issuecomment-493047188
 
 
   @flinkbot attention @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zzchun edited a comment on issue #8372: [FLINK-12354] [table] Add REVERSE function for table/sql API

2019-05-16 Thread GitBox
zzchun edited a comment on issue #8372: [FLINK-12354] [table] Add REVERSE 
function for table/sql API
URL: https://github.com/apache/flink/pull/8372#issuecomment-492262212
 
 
   cc @xccui @twalthr @fhueske  , can you help me to review this MR, thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678324
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -38,24 +44,76 @@ def is_stream(self, is_stream):
 
 @property
 def parallelism(self):
+"""
+The parallelism for all operations.
+"""
 return self._parallelism
 
 @parallelism.setter
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+"""
+The timezone_id for a timezone, either an abbreviation such as "PST", 
a full name such as
+"America/Los_Angeles", or a custom timezone_id such as "GMT-8:00".
+"""
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
+if timezone_id is not None and isinstance(timezone_id, str):
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678283
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -38,24 +44,76 @@ def is_stream(self, is_stream):
 
 @property
 def parallelism(self):
+"""
+The parallelism for all operations.
+"""
 return self._parallelism
 
 @parallelism.setter
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+"""
+The timezone_id for a timezone, either an abbreviation such as "PST", 
a full name such as
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678079
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -96,16 +105,413 @@ def where(self, predicate):
 """
 Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
 clause.
+
 Example:
 ::
-
 >>> tab.where("name = 'Fred'")
 
 :param predicate: Predicate expression string.
 :return: Result table.
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns only distinct (different) values.
+
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+   

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678208
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -96,16 +105,413 @@ def where(self, predicate):
 """
 Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
 clause.
+
 Example:
 ::
-
 >>> tab.where("name = 'Fred'")
 
 :param predicate: Predicate expression string.
 :return: Result table.
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns only distinct (different) values.
+
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+   

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678253
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -96,16 +105,413 @@ def where(self, predicate):
 """
 Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
 clause.
+
 Example:
 ::
-
 >>> tab.where("name = 'Fred'")
 
 :param predicate: Predicate expression string.
 :return: Result table.
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns only distinct (different) values.
+
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+   

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] 
Add all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284678155
 
 

 ##
 File path: flink-python/pyflink/table/query_config.py
 ##
 @@ -0,0 +1,112 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta
+from datetime import timedelta
+from py4j.compat import long
+
+from pyflink.java_gateway import get_gateway
+
+
+class QueryConfig(object):
+"""
+The :class:`QueryConfig` holds parameters to configure the behavior of 
queries.
+"""
+
+__metaclass__ = ABCMeta
+
+def __init__(self, j_query_config):
+self._j_query_config = j_query_config
+
+
+class StreamQueryConfig(QueryConfig):
+"""
+The :class:`StreamQueryConfig` holds parameters to configure the behavior 
of streaming queries.
+"""
+
+def __init__(self, j_stream_query_config=None):
+self._jvm = get_gateway().jvm
+if j_stream_query_config is not None:
+self._j_stream_query_config = j_stream_query_config
+else:
+self._j_stream_query_config = self._jvm.StreamQueryConfig()
+super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
+
+def with_idle_state_retention_time(self, min_time, max_time):
+"""
+Specifies a minimum and a maximum time interval for how long idle 
state, i.e., state which
+was not updated, will be retained.
+
+State will never be cleared until it was idle for less than the 
minimum time and will never
+be kept if it was idle for more than the maximum time.
+
+When new data arrives for previously cleaned-up state, the new data 
will be handled as if it
+was the first data. This can result in previous results being 
overwritten.
+
+Set to ``datetime.timedelta()``(zero) to never clean-up the state.
+
+.. note::
+Cleaning up state requires additional bookkeeping which becomes 
less expensive for
+larger differences of minTime and maxTime. The difference between 
minTime and maxTime
+must be at least ``datetime.timedelta(minutes=5)``(5 minutes).
+
+:param min_time: The minimum time interval for which idle state is 
retained. Set to
+ ``datetime.timedelta()``(zero) to never clean-up the 
state.
+:param max_time: The maximum time interval for which idle state is 
retained. Must be at
+ least 5 minutes greater than minTime. Set to
+ ``datetime.timedelta()``(zero) to never clean-up the 
state.
+:return: :class:`StreamQueryConfig`
+"""
+#  type: (timedelta, timedelta) -> StreamQueryConfig
+j_time_class = self._jvm.org.apache.flink.api.common.time.Time
+j_min_time = 
j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+j_max_time = 
j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+self._j_stream_query_config = \
+self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, 
j_max_time)
+return self
+
+def get_min_idle_state_retention_time(self):
+"""
+State might be cleared and removed if it was not updated for the 
defined period of time.
+
+:return: The minimum time until state which was not updated will be 
retained.
+"""
+#  type: () -> int
+return self._j_stream_query_config.getMinIdleStateRetentionTime()
+
+def get_max_idle_state_retention_time(self):
+"""
+State will be cleared and removed if it was not updated for the 
defined period of time.
+
+:return: The maximum time until state which was not updated will be 
retained.
+"""
+#  type: () -> int
+return self._j_stream_query_config.getMaxIdleStateRetentionTime()
+
+
+class BatchQueryConfig(QueryConfig):
+"""
+The :class:`BatchQueryConfig` holds parameters to configure the 

[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284670894
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 ##
 @@ -124,4 +171,25 @@ public static InputChannelMetrics 
newUnregisteredInputChannelMetrics() {
 
/** This class is not meant to be instantiated. */
private InputChannelTestUtils() {}
+
+   /** Test stub for {@link MemorySegmentProvider}. */
+   public static class StubMemorySegmentProvider implements 
MemorySegmentProvider {
+   private static final MemorySegmentProvider INSTANCE = new 
StubMemorySegmentProvider();
+
+   public static MemorySegmentProvider getInstance() {
+   return INSTANCE;
+   }
+
+   private StubMemorySegmentProvider() {
+   }
+
+   @Override
+   public Collection requestMemorySegments() {
+   return null;
 
 Review comment:
   seems safer to return an empty collection here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284655543
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 ##
 @@ -88,7 +90,9 @@ public void testConsumptionWithRemoteChannels() throws 
Exception {
final ConnectionManager connManager = 
createDummyConnectionManager();
final Source[] sources = new Source[numberOfChannels];
 
-   final SingleInputGate gate = 
createSingleInputGate(numberOfChannels);
 
 Review comment:
   the majority of these changes don't really seem necessary; we could just 
modify `createSingleInputGate` to use the builder and keep it as a short-hand. 
(btw., it is a bit odd that you aren't touching `InputChannelTestUtils` in this 
commit)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284672498
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -114,11 +114,12 @@ public void testConcurrentConsumeMultiplePartitions() 
throws Exception {
.setNumTargetKeyGroups(parallelism)
.setResultPartitionManager(partitionManager)
.setSendScheduleOrUpdateConsumersMessage(true)
+   .setBufferPoolFactory(p ->
+   
networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
.build();
 
// Create a buffer pool for this partition
 
 Review comment:
   comment seems a bit out-dated, I'd probably just remove it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284667951
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 ##
 @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws 
Exception {
final PartitionRequestClient client = new 
PartitionRequestClient(
channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
+   final int numExclusiveBuffers = 2;
 
 Review comment:
   variable name no longer makes sense; currently it implies that in the entire 
pool there are exactly 2 exclusive buffers. Also applies to other instances.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284656335
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Utility class to encapsulate the logic of building a {@link 
SingleInputGate} instance.
+ */
+public class SingleInputGateBuilder {
+
+   private JobID jobId = new JobID();
+
+   private IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
 
 Review comment:
   a bunch of these can be final


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284660153
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -123,13 +124,17 @@ public static NetworkEnvironment create(
new NetworkBufferPool(config.numNetworkBuffers(), 
config.networkBufferSize());
registerNetworkMetrics(metricGroup, networkBufferPool);
 
+   ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+   ResultPartitionFactory resultPartitionFactory =
+   new ResultPartitionFactory(resultPartitionManager, 
checkNotNull(ioManager));
 
 Review comment:
   let's move preconditions to the front of the method to catch things early 
and make these contracts easier to discover.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284667365
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -236,35 +236,32 @@ public void 
testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
}
 
/**
-* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* Tests {@link NetworkBufferPool} constructor with the invalid 
argument to
 * cause exception.
 */
-   @Test
-   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
-   final int numBuffers = 10;
-
-   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128);
-
+   @Test(expected = IllegalArgumentException.class)
+   public void testRequestMemorySegmentsWithInvalidArgument() {
+   NetworkBufferPool globalPool = null;
try {
// the number of requested buffers should be larger 
than zero
-   globalPool.requestMemorySegments(0);
+   globalPool = new NetworkBufferPool(10, 128, 0);
 
 Review comment:
   Would move this out of the `try` block. If this fails the finally block 
isn't doing anything anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284672050
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -154,13 +161,16 @@
 * The pool is registered with the partition *after* it as been 
constructed in order to conform
 * to the life-cycle of task registrations in the {@link TaskExecutor}.
 */
-   public void registerBufferPool(BufferPool bufferPool) {
-   checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= 
getNumberOfSubpartitions(),
-   "Bug in result partition setup logic: Buffer 
pool has not enough guaranteed buffers for this result partition.");
-
+   @Override
+   public void setup() throws IOException {
checkState(this.bufferPool == null, "Bug in result partition 
setup logic: Already registered buffer pool.");
 
+   BufferPool bufferPool = bufferPoolFactory.apply(this);
+   checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= 
getNumberOfSubpartitions(),
+   "Bug in result partition setup logic: Buffer pool has 
not enough guaranteed buffers for this result partition.");
+
this.bufferPool = checkNotNull(bufferPool);
 
 Review comment:
   null checks is ineffective


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284668096
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 ##
 @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws 
Exception {
final PartitionRequestClient client = new 
PartitionRequestClient(
channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
+   final int numExclusiveBuffers = 2;
 
 Review comment:
   maybe you could just in-line this thing like in most other tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284658421
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Builder for various {@link InputChannel} types.
+ */
+public class InputChannelBuilder {
+   static final ConnectionID STUB_CONNECTION_ID =
+   new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+
+   private int channelIndex = 0;
+   private ResultPartitionID partitionId = new ResultPartitionID();
+   private ConnectionID connectionID = STUB_CONNECTION_ID;
+   private ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+   private TaskEventPublisher taskEventPublisher = new 
TaskEventDispatcher();
+   private ConnectionManager connectionManager = new 
LocalConnectionManager();
+   private int initialBackoff = 0;
+   private int maxBackoff = 0;
+   private InputChannelMetrics metrics = 
InputChannelTestUtils.newUnregisteredInputChannelMetrics();
+
+   public static InputChannelBuilder newBuilder() {
+   return new InputChannelBuilder();
+   }
+
+   public InputChannelBuilder setChannelIndex(int channelIndex) {
+   this.channelIndex = channelIndex;
+   return this;
+   }
+
+   public InputChannelBuilder setPartitionId(ResultPartitionID 
partitionId) {
+   this.partitionId = partitionId;
+   return this;
+   }
+
+   public InputChannelBuilder setPartitionManager(ResultPartitionManager 
partitionManager) {
+   this.partitionManager = partitionManager;
+   return this;
+   }
+
+   InputChannelBuilder setTaskEventPublisher(TaskEventPublisher 
taskEventPublisher) {
+   this.taskEventPublisher = taskEventPublisher;
+   return this;
+   }
+
+   public InputChannelBuilder setConnectionManager(ConnectionManager 
connectionManager) {
+   this.connectionManager = connectionManager;
+   return this;
+   }
+
+   public InputChannelBuilder setInitialBackoff(int initialBackoff) {
+   this.initialBackoff = initialBackoff;
+   return this;
+   }
+
+   public InputChannelBuilder setMaxBackoff(int maxBackoff) {
+   this.maxBackoff = maxBackoff;
+   return this;
+   }
+
+   public InputChannelBuilder setMetrics(InputChannelMetrics metrics) {
+   this.metrics = metrics;
+   return this;
+   }
+
+   InputChannelBuilder setupFromNetworkEnvironment(NetworkEnvironment 
network) {
+   this.partitionManager = network.getResultPartitionManager();
+   this.connectionManager = network.getConnectionManager();
+   this.initialBackoff = 
network.getConfiguration().partitionRequestInitialBackoff();
+   this.maxBackoff = 
network.getConfiguration().partitionRequestMaxBackoff();
+   return this;
+   }
+
+   UnknownInputChannel buildUnknown(SingleInputGate inputGate) {
+   UnknownInputChannel channel = new UnknownInputChannel(
+

[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-16 Thread GitBox
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r284659459
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -92,31 +92,44 @@
 
private boolean isShutdown;
 
-   public NetworkEnvironment(
-   NetworkEnvironmentConfiguration config,
-   TaskEventPublisher taskEventPublisher,
-   MetricGroup metricGroup,
-   IOManager ioManager) {
-   this.config = checkNotNull(config);
-
-   this.networkBufferPool = new 
NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
-
-   NettyConfig nettyConfig = config.nettyConfig();
-   if (nettyConfig != null) {
-   this.connectionManager = new 
NettyConnectionManager(nettyConfig, config.isCreditBased());
-   } else {
-   this.connectionManager = new LocalConnectionManager();
-   }
+   private NetworkEnvironment(
+   NetworkEnvironmentConfiguration config,
 
 Review comment:
   please stick to the existing indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284674239
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
+ * multiple producer threads and a single consumer.
+ */
+public interface Mailbox extends MailboxReceiver, MailboxSender {
+
+   /**
+* The effect of this is that all pending letters are dropped and the 
given priorityAction
+* is enqueued to the head of the mailbox.
+*
+* @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
+*/
+   void clearAndPut(@Nonnull Runnable priorityAction);
 
 Review comment:
   There is an additional benefit from adding this, which is that the compiler 
introduces an assertion for `!= null` if you run the code with `-ea`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12303) Scala 2.12 lambdas does not work in event classes inside streams.

2019-05-16 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841258#comment-16841258
 ] 

Aljoscha Krettek commented on FLINK-12303:
--

Hi [~snilard], yes you're right, this wasn't the real issue. I'm sorry about 
that.

The real problem is that Scala 2.12 changes how Scala Lambdas are implemented 
(they now use the same underlying mechanism as Java Lambdas, which is available 
from Java 8 an onwards). It used to be that Kryo could serialize Scala Lambdas, 
which is used as a serializer in your test cases. It can't do that anymore for 
Scala 2.12 lambdas.

You can see exception if you enable logging, for example by adding these 
dependencies to your {{build.sbt}}:
{code}
  "org.slf4j" % "slf4j-log4j12" % "1.7.7" % "runtime",
  "log4j" % "log4j" % "1.2.17" % "runtime"
{code}

and by putting a {{log4j.properties}} in {{src/resources}}:
{code}
log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
{code}

This is the exception you will then find in the logs:
{code}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
lambda (demo.LambdaCaseClass)
lambda (demo.TsEventClass)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:33)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark(AbstractFetcher.java:459)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:404)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 26 more


{code}

> Scala 2.12 lambdas does not work in event classes inside streams.
> -
>
> Key: FLINK-12303
> URL: https://issues.apache.org/jira/browse/FLINK-12303
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Scala
>Affects 

[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284671140
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link Catalog} as well as 
{@link ExternalCatalog}).
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   checkArgument(
+   !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
+   "Default catalog name cannot be null or empty");
+   checkNotNull(defaultCatalog, "Default catalog cannot be null");
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogException if the registration of the catalog under 
the given name failed
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogException(format("Catalog %s already 
exists.", catalogName));
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog or empty if it does not exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Optional getCatalog(String catalogName) {
+   return Optional.ofNullable(catalogs.get(catalogName));
+   }
+
+   /**
+* Registers an external catalog under the given name. The catalog name 
must be unique across both
+* 

[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284670626
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ##
 @@ -66,21 +66,20 @@ public StreamIterationHead(Environment env) {
// 

 
@Override
-   protected boolean performDefaultAction() throws Exception {
+   protected void performDefaultAction(ActionContext context) throws 
Exception {
StreamRecord nextRecord = shouldWait ?
dataChannel.poll(iterationWaitTime, 
TimeUnit.MILLISECONDS) :
dataChannel.take();
 
-   if (nextRecord == null) {
-   return false;
-   }
-
-   synchronized (getCheckpointLock()) {
-   for (RecordWriterOutput output : streamOutputs) {
-   output.collect(nextRecord);
+   if (nextRecord != null) {
 
 Review comment:
   I don't think there is a general rule like this, also not in the wip 
styleguide. I mean it is as good as asking to put the "likely" case first, 
which this does. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8414: [FLINK-12477][Build] Enforce maven version 3.1.1

2019-05-16 Thread GitBox
zentol commented on issue #8414: [FLINK-12477][Build] Enforce maven version 
3.1.1
URL: https://github.com/apache/flink/pull/8414#issuecomment-493037451
 
 
   Finally you also have to update the ` cron-master-maven_compat` branch to no 
longer check for maven 3.0.X compatibility


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12534) Reduce the test cost for Python API

2019-05-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12534:
---
Labels: pull-request-available  (was: )

> Reduce the test cost for Python API
> ---
>
> Key: FLINK-12534
> URL: https://issues.apache.org/jira/browse/FLINK-12534
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Travis
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop 
> 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 
> is enough, and we can remove the test for Scala 2.12 and  Hadoop 2.4.1. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-16 Thread GitBox
flinkbot commented on issue #8465: [FLINK-12534][travis][python] Reduce the 
test cost for Python API
URL: https://github.com/apache/flink/pull/8465#issuecomment-493037224
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 opened a new pull request #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-16 Thread GitBox
WeiZhong94 opened a new pull request #8465: [FLINK-12534][travis][python] 
Reduce the test cost for Python API
URL: https://github.com/apache/flink/pull/8465
 
 
   ## What is the purpose of the change
   
   *This pull request removes all jars except flink-dist.jar,  flink-table.jar 
and flink-table-planner-test.jar from travis compile result and remove python 
tests in scala 2.12 and hadoop 2.4.1 travis cron test jobs. That means only 
these 3 jars would be cached in travis, which can make the cache file shrink by 
350MB per profile. *
   
   
   ## Brief change log
   
 - *Delete more jars after compile, only flink-dist.jar,  flink-table.jar 
and flink-table-planner-test.jar will be cached now.*
 - *Remove python tests in scala 2.12 and hadoop 2.4.1 cron test jobs.*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"

2019-05-16 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841245#comment-16841245
 ] 

Gary Yao commented on FLINK-12384:
--

If you are running ZK behind an LB, there are additional considerations to 
make: https://wiki.apache.org/hadoop/ZooKeeper/FAQ#A8
I am mentioning this here because it is not clear what _"analytics-zetcd:2181"_ 
refers to.

> Rolling the etcd servers causes "Connected to an old server; r-o mode will be 
> unavailable"
> --
>
> Key: FLINK-12384
> URL: https://issues.apache.org/jira/browse/FLINK-12384
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> [tm] 2019-05-01 13:30:53,316 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
> Initiating client connection, connectString=analytics-zetcd:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
> [tm] 2019-05-01 13:30:53,384 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
> configured hostname/address for TaskManager: 10.1.2.173.
> [tm] 2019-05-01 13:30:53,401 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> [tm] 2019-05-01 13:30:53,418 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to 
> start actor system at 10.1.2.173:0
> [tm] 2019-05-01 13:30:53,420 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating 
> session
> [tm] 2019-05-01 13:30:53,500 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
> Connected to an old server; r-o mode will be unavailable
> [tm] 2019-05-01 13:30:53,500 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> establishment complete on server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
> 0xbf06a739001d446, negotiated timeout = 6
> [tm] 2019-05-01 13:30:53,525 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: CONNECTED{code}
> Repro:
> Start an etcd-cluster, with e.g. etcd-operator, with three members. Start 
> zetcd in front. Configure the sesssion cluster to go against zetcd.
> Ensure the job can start successfully.
> Now, kill the etcd pods one by one, letting the quorum re-establish in 
> between, so that the cluster is still OK.
> Now restart the job/tm pods. You'll end up in this no-mans-land.
>  
> —
> Workaround: clean out the etcd cluster and remove all its data, however, this 
> resets all time windows and state, despite having that saved in GCS, so it's 
> a crappy workaround.
>  
> –
>  
> flink-conf.yaml
> {code:java}
> parallelism.default: 1
> rest.address: analytics-job
> jobmanager.rpc.address: analytics-job # = resource manager's address too
> jobmanager.heap.size: 1024m
> jobmanager.rpc.port: 6123
> jobmanager.slot.request.timeout: 3
> resourcemanager.rpc.port: 6123
> high-availability.jobmanager.port: 6123
> blob.server.port: 6124
> queryable-state.server.ports: 6125
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 1
> web.log.path: /var/lib/log/flink/jobmanager.log
> rest.port: 8081
> rest.bind-address: 0.0.0.0
> web.submit.enable: false
> high-availability: zookeeper
> high-availability.storageDir: gs://example_analytics/flink/zetcd/
> high-availability.zookeeper.quorum: analytics-zetcd:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.client.acl: open
> state.backend: rocksdb
> state.checkpoints.num-retained: 3
> state.checkpoints.dir: gs://example_analytics/flink/checkpoints
> state.savepoints.dir: gs://example_analytics/flink/savepoints
> state.backend.incremental: true
> state.backend.async: true
> fs.hdfs.hadoopconf: /opt/flink/hadoop
> log.file: /var/lib/log/flink/jobmanager.log{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284669437
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Consumer-facing side of the {@link Mailbox} interface. This is used to 
dequeue letters. The mailbox returns letters
+ * in the order by which they were enqueued. A mailbox should only be consumed 
by one thread at a time.
+ */
+public interface MailboxReceiver {
+
+   /**
+* Returns true if the mailbox contains mail.
+*/
+   boolean hasMail();
+
+   /**
+* Returns the oldest letter from the mailbox (head of queue) if the 
mailbox is not empty or
+* null otherwise.
+*
+* @return the oldest letter from the mailbox (head of queue) if the 
mailbox is not empty or
+* null otherwise.
+*/
+   @Nullable
+   Runnable tryTakeMail();
 
 Review comment:
   I would prefer to keep this as returning `null` because it is a data 
structure and therefore in line with what JDK queues etc do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284668961
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 ##
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implementation of {@link Mailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * our use case with multiple writers, single reader and volatile reads 
instead of lock & read on {@link #count}.
+ */
+public class MailboxImpl implements Mailbox {
+
+   /**
+* The enqueued letters.
+*/
+   @GuardedBy("lock")
+   private final Runnable[] ringBuffer;
+
+   /**
+* Lock for all concurrent ops.
+*/
+   private final ReentrantLock lock;
+
+   /**
+* Condition that is triggered when the buffer is no longer empty.
+*/
+   @GuardedBy("lock")
+   private final Condition notEmpty;
+
+   /**
+* Condition that is triggered when the buffer is no longer full.
+*/
+   @GuardedBy("lock")
+   private final Condition notFull;
+
+   /**
+* Index of the ring buffer head.
+*/
+   @GuardedBy("lock")
+   private int headIndex;
+
+   /**
+* Index of the ring buffer tail.
+*/
+   @GuardedBy("lock")
+   private int tailIndex;
+
+   /**
+* Number of letters in the mailbox.
+*/
+   @GuardedBy("lock")
+   private volatile int count;
+
+   /**
+* A mask to wrap around the indexes of the ring buffer. We use this to 
avoid ifs or modulo ops.
+*/
+   private final int moduloMask;
+
+   public MailboxImpl() {
+   this(6); // 2^6 = 64
+   }
+
+   public MailboxImpl(int capacityPow2) {
+   final int capacity = 1 << capacityPow2;
+   Preconditions.checkState(capacity > 0);
+   this.moduloMask = capacity - 1;
+   this.ringBuffer = new Runnable[capacity];
+   this.lock = new ReentrantLock();
+   this.notEmpty = lock.newCondition();
+   this.notFull = lock.newCondition();
+   }
+
+   @Override
+   public boolean hasMail() {
 
 Review comment:
   Is empty would always be used inverted, so I think the positive phrase is ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284668779
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Producer-facing side of the {@link Mailbox} interface. This is used to 
enqueue letters. Multiple producers threads
+ * can put to the same mailbox.
+ */
+public interface MailboxSender {
+
+   /**
+* Enqueues the given letter to the mailbox, if capacity is available. 
On success, this returns true
+* and false if the mailbox was already full.
+*
+* @param letter the letter to enqueue.
+* @return true iff successful.
+*/
+   boolean tryPutMail(@Nonnull Runnable letter);
+
+   /**
+* Enqueues the given letter to the mailbox and blocks until there is 
capacity for a successful put.
+*
+* @param letter the letter to enqueue.
+* @throws InterruptedException on interruption.
+*/
+   void putMail(@Nonnull Runnable letter) throws InterruptedException;
+
+   /**
+* This method blocks until the mailbox has again capacity to enqueue 
new letters.
+*
+* @throws InterruptedException on interruption.
+*/
+   void waitUntilHasCapacity() throws InterruptedException;
 
 Review comment:
   Same: For clients that want to be non-blocking w.r.t. the queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284668589
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Producer-facing side of the {@link Mailbox} interface. This is used to 
enqueue letters. Multiple producers threads
+ * can put to the same mailbox.
+ */
+public interface MailboxSender {
+
+   /**
+* Enqueues the given letter to the mailbox, if capacity is available. 
On success, this returns true
+* and false if the mailbox was already full.
+*
+* @param letter the letter to enqueue.
+* @return true iff successful.
+*/
+   boolean tryPutMail(@Nonnull Runnable letter);
 
 Review comment:
   I think it can be helpful if client code that wants to run code can be 
non-blocking if required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284668074
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -230,14 +242,29 @@ protected StreamTask(
 * This method implements the default action of the task (e.g. 
processing one event from the input). Implementations
 * should (in general) be non-blocking.
 *
-* @return true if there is more work to perform as 
default action for this task and false
-* if the task is ready to finish.
+* @param context context object for collaborative interaction between 
the action and the stream task.
 * @throws Exception on any problems in the action.
 */
-   protected abstract boolean performDefaultAction() throws Exception;
+   protected abstract void performDefaultAction(ActionContext context) 
throws Exception;
 
-   protected void run() throws Exception {
-   while (isRunning && performDefaultAction()) {}
+   /**
+* Runs the stream-tasks main processing loop.
+*/
+   private void run() throws Exception {
 
 Review comment:
   Please see my comment here:
   https://github.com/apache/flink/pull/8409#discussion_r284663084


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12039) Remove ASSIGNED_SLOT_UPDATER in Execution.tryAssignResource

2019-05-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-12039:
-
Priority: Minor  (was: Major)

> Remove ASSIGNED_SLOT_UPDATER in Execution.tryAssignResource
> ---
>
> Key: FLINK-12039
> URL: https://issues.apache.org/jira/browse/FLINK-12039
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Minor
>
> After making access to ExecutionGraph single-threaded in FLINK-11417, we can 
> simplify execution slot assignment in Execution.tryAssignResource and get rid 
> of ASSIGNED_SLOT_UPDATER as it happens now only in one JM main thread. 
> Though, it seems that we have to keep `assignedResource` as volatile at the 
> moment which could be further investigated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"

2019-05-16 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841209#comment-16841209
 ] 

Gary Yao edited comment on FLINK-12384 at 5/16/19 11:49 AM:


[~haf] I checked the ZK client code and the warning is not something to be 
concerned about. The client is talking to a ZK server that does not support the 
r-o mode.

Also see:
https://github.com/apache/zookeeper/blob/e45551fc7c691332ace7bff81926855e42ac2239/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L147-L149
http://zookeeper-user.578899.n2.nabble.com/Connected-to-an-old-server-r-o-mode-will-be-unavailable-td7578775.html

Is your cluster not starting up correctly or not recovering the jobs? If that 
is the case, I would like to see the complete jobmanager logs if possible.


was (Author: gjy):
[~haf] I checked the ZK client code and the warning is not something to be 
concerned about. The client is talking to a ZK server that does not support the 
r-o mode.

Also see:
https://github.com/apache/zookeeper/blob/e45551fc7c691332ace7bff81926855e42ac2239/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L147-L149
http://zookeeper-user.578899.n2.nabble.com/Connected-to-an-old-server-r-o-mode-will-be-unavailable-td7578775.html

Is your cluster not starting up correctly or recovering the jobs? If that is 
the case, I would like to see the complete jobmanager logs if possible.

> Rolling the etcd servers causes "Connected to an old server; r-o mode will be 
> unavailable"
> --
>
> Key: FLINK-12384
> URL: https://issues.apache.org/jira/browse/FLINK-12384
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> [tm] 2019-05-01 13:30:53,316 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
> Initiating client connection, connectString=analytics-zetcd:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
> [tm] 2019-05-01 13:30:53,384 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
> configured hostname/address for TaskManager: 10.1.2.173.
> [tm] 2019-05-01 13:30:53,401 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> [tm] 2019-05-01 13:30:53,418 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to 
> start actor system at 10.1.2.173:0
> [tm] 2019-05-01 13:30:53,420 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating 
> session
> [tm] 2019-05-01 13:30:53,500 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
> Connected to an old server; r-o mode will be unavailable
> [tm] 2019-05-01 13:30:53,500 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> establishment complete on server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
> 0xbf06a739001d446, negotiated timeout = 6
> [tm] 2019-05-01 13:30:53,525 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: CONNECTED{code}
> Repro:
> Start an etcd-cluster, with e.g. etcd-operator, with three members. Start 
> zetcd in front. Configure the sesssion cluster to go against zetcd.
> Ensure the job can start successfully.
> Now, kill the etcd pods one by one, letting the quorum re-establish in 
> between, so that the cluster is still OK.
> Now restart the job/tm pods. You'll end up in this no-mans-land.
>  
> —
> Workaround: clean out the etcd cluster and remove all its data, however, this 
> resets all time windows and state, despite having that saved in GCS, so it's 
> a crappy workaround.
>  
> –
>  
> flink-conf.yaml
> {code:java}
> parallelism.default: 1
> rest.address: analytics-job
> jobmanager.rpc.address: analytics-job # = resource manager's address too
> 

[jira] [Updated] (FLINK-12528) Remove progressLock in ExecutionGraph

2019-05-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-12528:
-
Priority: Minor  (was: Major)

> Remove progressLock in ExecutionGraph
> -
>
> Key: FLINK-12528
> URL: https://issues.apache.org/jira/browse/FLINK-12528
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Since {{ExecutionGraph}} can only be accessed from a single 
> thread(FLINK-11417), we can remove the progressLock from {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] 
Support (legacy) SourceFunction as special case in the mailbox model for stream 
tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284667461
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 ##
 @@ -98,12 +100,44 @@ protected void cleanup() {
}
 
@Override
-   protected void run() throws Exception {
-   headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
 
 Review comment:
   That suggestion would need an `if` in the default action that triggers for 
the first invocation. Starting in another method, the question is which one and 
we also probably require to join back in another place then.
   
   What I mean is, the alternative loop does two things different:
   - no invocation of default action.
   - running all letters inside a `synchronized` block.
   
   So the question that remains is, how you plan to add the synchronized block 
to the existing mailbox loop? Personally I feel like this approach might be the 
cleanest and most performant, but I would be open to suggestions for a complete 
alternative if you have something in mind.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12527) Remove GLOBAL_VERSION_UPDATER in ExecutionGraph

2019-05-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-12527:
-
Priority: Minor  (was: Major)

> Remove GLOBAL_VERSION_UPDATER in ExecutionGraph
> ---
>
> Key: FLINK-12527
> URL: https://issues.apache.org/jira/browse/FLINK-12527
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Since {{ExecutionGraph}} can only be accessed from a single thread. We can 
> remove {{AtomicLongFieldUpdater GLOBAL_VERSION_UPDATER}} from 
> {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

2019-05-16 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841244#comment-16841244
 ] 

vinoyang commented on FLINK-10855:
--

OK [~till.rohrmann] thanks. [~richtesn] can you still want to be my mentor to 
discuss the design of the cleanup task?

> CheckpointCoordinator does not delete checkpoint directory of late/failed 
> checkpoints
> -
>
> Key: FLINK-10855
> URL: https://issues.apache.org/jira/browse/FLINK-10855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot 
> be acknowledged, we discard the subtask state in the 
> {{CheckpointCoordinator}}. What's not happening in this case is that we 
> delete the parent directory of the checkpoint. This only happens when we 
> dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
> being ready) and we delete the checkpoint directory. Next another task writes 
> its checkpoint data to the checkpoint directory (thereby creating it again) 
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
> {{CheckpointCoordinator}} will realize that there is no longer a 
> {{PendingCheckpoint}} and will discard the sub task state. This will remove 
> the state files from the checkpoint directory but will leave the checkpoint 
> directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12526) Remove STATE_UPDATER in ExecutionGraph

2019-05-16 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841243#comment-16841243
 ] 

vinoyang commented on FLINK-12526:
--

[~till.rohrmann] OK, thanks for the notice, I have not started it. Will wait a 
bit unit [~gjy] finish his work.

> Remove STATE_UPDATER in ExecutionGraph
> --
>
> Key: FLINK-12526
> URL: https://issues.apache.org/jira/browse/FLINK-12526
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Since {{ExecutionGraph}} can only be accessed from a single 
> thread(FLINK-11417). We can remove the 
> {{AtomicReferenceFieldUpdater STATE_UPDATER}} from 
> {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa…

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8409: [FLINK-12478] 
Decompose monolithic run-loops in StreamTask implementa…
URL: https://github.com/apache/flink/pull/8409#discussion_r284665683
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -222,12 +222,24 @@ protected StreamTask(
 
protected abstract void init() throws Exception;
 
-   protected abstract void run() throws Exception;
-
protected abstract void cleanup() throws Exception;
 
protected abstract void cancelTask() throws Exception;
 
+   /**
+* This method implements the default action of the task (e.g. 
processing one event from the input). Implementations
+* should (in general) be non-blocking.
+*
+* @return true if there is more work to perform as 
default action for this task and false
+* if the task is ready to finish.
+* @throws Exception on any problems in the action.
+*/
+   protected abstract boolean performDefaultAction() throws Exception;
 
 Review comment:
   It changes the line twice because it is incrementally changed. I see no 
problem with that, it is work for different PRs with different purposes. If we 
would really decide to merge them, then we should also merge the JIRA tasks, 
but I think they are valuable as separated steps.
   
   Even if i had a dummy context, the boolean is still required and the 
suggestion would not really help to solve the problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa…

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8409: [FLINK-12478] 
Decompose monolithic run-loops in StreamTask implementa…
URL: https://github.com/apache/flink/pull/8409#discussion_r284664043
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ##
 @@ -41,88 +42,73 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
-   private volatile boolean running = true;
+   private RecordWriterOutput[] streamOutputs;
+
+   private final BlockingQueue> dataChannel;
+   private final String brokerID;
+   private final long iterationWaitTime;
+   private final boolean shouldWait;
 
public StreamIterationHead(Environment env) {
super(env);
+   final String iterationId = getConfiguration().getIterationId();
+   if (iterationId == null || iterationId.length() == 0) {
+   throw new FlinkRuntimeException("Missing iteration ID 
in the task configuration");
+   }
+
+   this.dataChannel = new ArrayBlockingQueue<>(1);
+   this.brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
+   getEnvironment().getTaskInfo().getIndexOfThisSubtask());
+   this.iterationWaitTime = 
getConfiguration().getIterationWaitTime();
+   this.shouldWait = iterationWaitTime > 0;
 
 Review comment:
   In general not a bad idea, but how can I assign to `final` fields from an 
init method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5621) Flink should provide a mechanism to prevent scheduling tasks on TaskManagers with operational issues

2019-05-16 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841238#comment-16841238
 ] 

Till Rohrmann commented on FLINK-5621:
--

I think this issue is tightly related to a general blacklisting mechanism of 
TMs in a Flink cluster. There was a [design 
discussion|http://osdir.com/apache-flink-development/msg09858.html] on the dev 
ML and also a design document. Since this is a bigger feature we should first 
reach consensus how to do it.

> Flink should provide a mechanism to prevent scheduling tasks on TaskManagers 
> with operational issues
> 
>
> Key: FLINK-5621
> URL: https://issues.apache.org/jira/browse/FLINK-5621
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Affects Versions: 1.1.4
>Reporter: Jamie Grier
>Assignee: vinoyang
>Priority: Major
>
> There are cases where jobs can get into a state where no progress can be made 
> if there is something pathologically wrong with one of the TaskManager nodes 
> in the cluster.
> An example of this would be a TaskManager on a machine that runs out of disk 
> space.  Flink never considers the TM to be "bad" and will keep using it to 
> attempt to run tasks -- which will continue to fail.
> A suggestion for overcoming this would be to allow an option where a TM will 
> commit suicide if that TM was the source of an exception that caused a job to 
> fail/restart.
> I'm sure there are plenty of other approaches to solving this..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa…

2019-05-16 Thread GitBox
StefanRRichter commented on a change in pull request #8409: [FLINK-12478] 
Decompose monolithic run-loops in StreamTask implementa…
URL: https://github.com/apache/flink/pull/8409#discussion_r284663084
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 ##
 @@ -98,13 +96,8 @@ public void init() throws Exception {
}
 
@Override
-   protected void run() throws Exception {
-   // cache processor reference on the stack, to make the code 
more JIT friendly
-   final StreamInputProcessor inputProcessor = 
this.inputProcessor;
-
-   while (running && inputProcessor.processInput()) {
-   // all the work happens in the "processInput" method
-   }
+   protected boolean performDefaultAction() throws Exception {
 
 Review comment:
   I ran the following:
   ```
env.setParallelism(1);
source
.map(new MultiplyByTwo())
.addSink(new DiscardingSink<>());
   
   master:
   Benchmark  Mode  Cnt  Score Error   Units
   SumLongsBenchmark.benchmarkCount  thrpt   30  16240.866 ± 311.845  ops/ms
   
   master + #8442:
   Benchmark  Mode  Cnt  Score Error   Units
   SumLongsBenchmark.benchmarkCount  thrpt   30  16540.537 ± 295.100  ops/ms
   ```
   
   ```
env.setParallelism(1);
source
.map(new MultiplyByTwo())
.rebalance()
.map((x) -> x)
.addSink(new DiscardingSink<>());
   
   master:
   Benchmark  Mode  Cnt Score Error   Units
   SumLongsBenchmark.benchmarkCount  thrpt   30  9715.334 ± 170.632  ops/ms
   
   master + #8442:
   Benchmark  Mode  Cnt Score Error   Units
   SumLongsBenchmark.benchmarkCount  thrpt   30  9932.752 ± 124.494  ops/ms
   ```
   
   I would conclude the changes run as fast or a notch faster than master on my 
local machine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

2019-05-16 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841233#comment-16841233
 ] 

Till Rohrmann commented on FLINK-10855:
---

I guess that a regular clean up task could do the trick [~yanghua].

> CheckpointCoordinator does not delete checkpoint directory of late/failed 
> checkpoints
> -
>
> Key: FLINK-10855
> URL: https://issues.apache.org/jira/browse/FLINK-10855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot 
> be acknowledged, we discard the subtask state in the 
> {{CheckpointCoordinator}}. What's not happening in this case is that we 
> delete the parent directory of the checkpoint. This only happens when we 
> dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
> being ready) and we delete the checkpoint directory. Next another task writes 
> its checkpoint data to the checkpoint directory (thereby creating it again) 
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
> {{CheckpointCoordinator}} will realize that there is no longer a 
> {{PendingCheckpoint}} and will discard the sub task state. This will remove 
> the state files from the checkpoint directory but will leave the checkpoint 
> directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12526) Remove STATE_UPDATER in ExecutionGraph

2019-05-16 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841232#comment-16841232
 ] 

Till Rohrmann commented on FLINK-12526:
---

Let's do it step by step and only change the {{STATE_UPDATER}}. I would suggest 
to wait a bit with that because Gary is currently implementing the new 
{{Scheduler}} which also needs changes on the {{ExecutionGraph}}. There might 
be merge conflicts due to this work.

> Remove STATE_UPDATER in ExecutionGraph
> --
>
> Key: FLINK-12526
> URL: https://issues.apache.org/jira/browse/FLINK-12526
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Since {{ExecutionGraph}} can only be accessed from a single 
> thread(FLINK-11417). We can remove the 
> {{AtomicReferenceFieldUpdater STATE_UPDATER}} from 
> {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12526) Remove STATE_UPDATER in ExecutionGraph

2019-05-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-12526:
--
Priority: Minor  (was: Major)

> Remove STATE_UPDATER in ExecutionGraph
> --
>
> Key: FLINK-12526
> URL: https://issues.apache.org/jira/browse/FLINK-12526
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Since {{ExecutionGraph}} can only be accessed from a single 
> thread(FLINK-11417). We can remove the 
> {{AtomicReferenceFieldUpdater STATE_UPDATER}} from 
> {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12476) [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-16 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12476:

Description: 
At the moment we have two efficient background cleanup strategies: incremental 
for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to 
activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each 
is activated only for certain backend type and inactive for other. They have 
different tuning parameters.

The idea is to add method *cleanupInBackground* which would activate default 
background cleanup. User does not need to think then about details or used 
backend if not needed. Depending on actually used backend, the corresponding 
cleanup will kick in. If original strategy is not set with 
*cleanupIncrementally* and *cleanupInRocksdbCompactFilter* then backends should 
check whether default background cleanup is activated and if so, use it with 
default parameters.

We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in 
favour of this new method.

  was:
At the moment we have two efficient background cleanup strategies: incremental 
for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to 
activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each 
is activated only for certain backend type and inactive for other. They have 
different tuning parameters.

The idea is to add method *cleanupInBackground* which would activate default 
background cleanup. User does not need to think then about details or used 
backend if not needed. Depending on actually used backend, the corresponding 
cleanup will kick in. Backends should check whether default background cleanup 
is activated

We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in 
favour of this new method.


> [State TTL] Consider setting a default background cleanup strategy in 
> StateTtlConfig
> 
>
> Key: FLINK-12476
> URL: https://issues.apache.org/jira/browse/FLINK-12476
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> At the moment we have two efficient background cleanup strategies: 
> incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has 
> 2 methods to activate them: *cleanupIncrementally* and 
> *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend 
> type and inactive for other. They have different tuning parameters.
> The idea is to add method *cleanupInBackground* which would activate default 
> background cleanup. User does not need to think then about details or used 
> backend if not needed. Depending on actually used backend, the corresponding 
> cleanup will kick in. If original strategy is not set with 
> *cleanupIncrementally* and *cleanupInRocksdbCompactFilter* then backends 
> should check whether default background cleanup is activated and if so, use 
> it with default parameters.
> We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in 
> favour of this new method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12476) [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-16 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12476:

Description: 
At the moment we have two efficient background cleanup strategies: incremental 
for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to 
activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each 
is activated only for certain backend type and inactive for other. They have 
different tuning parameters.

The idea is to add method *cleanupInBackground* which would activate default 
background cleanup. User does not need to think then about details or used 
backend if not needed. Depending on actually used backend, the corresponding 
cleanup will kick in. Backends should check whether default background cleanup 
is activated

We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in 
favour of this new method.

  was:
At the moment we have two efficient background cleanup strategies: incremental 
for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to 
activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each 
is activated only for certain backend type and inactive for other. They have 
different tuning parameters.

The idea is to add method *cleanupInBackground* which would activate both of 
them with default parameters. User does not need to think then about details or 
used backend if not needed. Depending on actually used backend, the 
corresponding cleanup will kick in. Sample implementation:

 
{code:java}
public Builder cleanupInBackground() {
    return cleanupIncrementally(10, false).cleanupInRocksdbCompactFilter(1000L);
}
{code}
We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in 
favour of this new method.


> [State TTL] Consider setting a default background cleanup strategy in 
> StateTtlConfig
> 
>
> Key: FLINK-12476
> URL: https://issues.apache.org/jira/browse/FLINK-12476
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> At the moment we have two efficient background cleanup strategies: 
> incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has 
> 2 methods to activate them: *cleanupIncrementally* and 
> *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend 
> type and inactive for other. They have different tuning parameters.
> The idea is to add method *cleanupInBackground* which would activate default 
> background cleanup. User does not need to think then about details or used 
> backend if not needed. Depending on actually used backend, the corresponding 
> cleanup will kick in. Backends should check whether default background 
> cleanup is activated
> We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in 
> favour of this new method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-16 Thread GitBox
aljoscha commented on a change in pull request #8459: [FLINK-12476] [State TTL] 
Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r284659324
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -327,6 +329,14 @@ public Builder cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries) {
return this;
}
 
+   /**
+* Cleanup expired state by activating both strategies. 
Depending on actually used backend, the corresponding cleanup will kick in.
+*/
+   @Nonnull
+   public Builder cleanupInBackground() {
+   return cleanupIncrementally(10, 
true).cleanupInRocksdbCompactFilter(1000L);
 
 Review comment:
   exactly!  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-16 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r284658756
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -327,6 +329,14 @@ public Builder cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries) {
return this;
}
 
+   /**
+* Cleanup expired state by activating both strategies. 
Depending on actually used backend, the corresponding cleanup will kick in.
+*/
+   @Nonnull
+   public Builder cleanupInBackground() {
+   return cleanupIncrementally(10, 
true).cleanupInRocksdbCompactFilter(1000L);
 
 Review comment:
   @aljoscha true, we can move concrete decision about default configuration to 
backends.
   
   @yanghua the method `cleanupInBackground` should be  still added but rather 
as a boolean flag in StateTtlConfig. Backends should check this flag where they 
check configuration of their corresponding cleanup strategies and use default 
ones.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager

2019-05-16 Thread GitBox
zhijiangW commented on issue #8428: [FLINK-12497][network] Refactor the start 
method of ConnectionManager
URL: https://github.com/apache/flink/pull/8428#issuecomment-493024930
 
 
   Thanks for the review @azagrebin !


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"

2019-05-16 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841209#comment-16841209
 ] 

Gary Yao commented on FLINK-12384:
--

[~haf] I checked the ZK client code and the warning is not something to be 
concerned about. The client is talking to a ZK server that does not support the 
r-o mode.

Also see:
https://github.com/apache/zookeeper/blob/e45551fc7c691332ace7bff81926855e42ac2239/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L147-L149
http://zookeeper-user.578899.n2.nabble.com/Connected-to-an-old-server-r-o-mode-will-be-unavailable-td7578775.html

Is your cluster not starting up correctly or recovering the jobs? If that is 
the case, I would like to see the complete jobmanager logs if possible.

> Rolling the etcd servers causes "Connected to an old server; r-o mode will be 
> unavailable"
> --
>
> Key: FLINK-12384
> URL: https://issues.apache.org/jira/browse/FLINK-12384
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> [tm] 2019-05-01 13:30:53,316 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
> Initiating client connection, connectString=analytics-zetcd:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
> [tm] 2019-05-01 13:30:53,384 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
> configured hostname/address for TaskManager: 10.1.2.173.
> [tm] 2019-05-01 13:30:53,401 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> [tm] 2019-05-01 13:30:53,418 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to 
> start actor system at 10.1.2.173:0
> [tm] 2019-05-01 13:30:53,420 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating 
> session
> [tm] 2019-05-01 13:30:53,500 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
> Connected to an old server; r-o mode will be unavailable
> [tm] 2019-05-01 13:30:53,500 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> establishment complete on server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
> 0xbf06a739001d446, negotiated timeout = 6
> [tm] 2019-05-01 13:30:53,525 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: CONNECTED{code}
> Repro:
> Start an etcd-cluster, with e.g. etcd-operator, with three members. Start 
> zetcd in front. Configure the sesssion cluster to go against zetcd.
> Ensure the job can start successfully.
> Now, kill the etcd pods one by one, letting the quorum re-establish in 
> between, so that the cluster is still OK.
> Now restart the job/tm pods. You'll end up in this no-mans-land.
>  
> —
> Workaround: clean out the etcd cluster and remove all its data, however, this 
> resets all time windows and state, despite having that saved in GCS, so it's 
> a crappy workaround.
>  
> –
>  
> flink-conf.yaml
> {code:java}
> parallelism.default: 1
> rest.address: analytics-job
> jobmanager.rpc.address: analytics-job # = resource manager's address too
> jobmanager.heap.size: 1024m
> jobmanager.rpc.port: 6123
> jobmanager.slot.request.timeout: 3
> resourcemanager.rpc.port: 6123
> high-availability.jobmanager.port: 6123
> blob.server.port: 6124
> queryable-state.server.ports: 6125
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 1
> web.log.path: /var/lib/log/flink/jobmanager.log
> rest.port: 8081
> rest.bind-address: 0.0.0.0
> web.submit.enable: false
> high-availability: zookeeper
> high-availability.storageDir: gs://example_analytics/flink/zetcd/
> high-availability.zookeeper.quorum: analytics-zetcd:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.client.acl: open
> state.backend: 

[jira] [Commented] (FLINK-12533) Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type

2019-05-16 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841207#comment-16841207
 ] 

sunjincheng commented on FLINK-12533:
-

+1 for this change! [~hequn8128]

> Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type
> --
>
> Key: FLINK-12533
> URL: https://issues.apache.org/jira/browse/FLINK-12533
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, there are four kinds of {{FunctionDefinition.Type}},
> {code:java}
>   public enum Type {
>   AGGREGATE_FUNCTION,
>   SCALAR_FUNCTION,
>   TABLE_FUNCTION,
>   OTHER_FUNCTION
>   }
> {code}
> The Type AGGREGATE_FUNCTION is used to express both AggregateFunction and 
> TableAggregateFunction. However, due to the two kinds of the function 
> contains different semantics. It would be nice if we can separate these two 
> kinds of functions more clearly by introducing another type 
> TABLE_AGGREGATE_FUNCTION. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12534) Reduce the test cost for Python API

2019-05-16 Thread Wei Zhong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Zhong reassigned FLINK-12534:
-

Assignee: Wei Zhong

> Reduce the test cost for Python API
> ---
>
> Key: FLINK-12534
> URL: https://issues.apache.org/jira/browse/FLINK-12534
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Travis
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>
> Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop 
> 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 
> is enough, and we can remove the test for Scala 2.12 and  Hadoop 2.4.1. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12534) Reduce the test cost for Python API

2019-05-16 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12534:
---

 Summary: Reduce the test cost for Python API
 Key: FLINK-12534
 URL: https://issues.apache.org/jira/browse/FLINK-12534
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Travis
Affects Versions: 1.9.0
Reporter: sunjincheng


Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop 
2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 
is enough, and we can remove the test for Scala 2.12 and  Hadoop 2.4.1. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12533) Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type

2019-05-16 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-12533:
---

 Summary: Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type
 Key: FLINK-12533
 URL: https://issues.apache.org/jira/browse/FLINK-12533
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Hequn Cheng
Assignee: Hequn Cheng


Currently, there are four kinds of {{FunctionDefinition.Type}},
{code:java}
public enum Type {
AGGREGATE_FUNCTION,
SCALAR_FUNCTION,
TABLE_FUNCTION,
OTHER_FUNCTION
}
{code}
The Type AGGREGATE_FUNCTION is used to express both AggregateFunction and 
TableAggregateFunction. However, due to the two kinds of the function contains 
different semantics. It would be nice if we can separate these two kinds of 
functions more clearly by introducing another type TABLE_AGGREGATE_FUNCTION. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12532) Upgrade Avro to version 1.9.0

2019-05-16 Thread JIRA
Ismaël Mejía created FLINK-12532:


 Summary: Upgrade Avro to version 1.9.0
 Key: FLINK-12532
 URL: https://issues.apache.org/jira/browse/FLINK-12532
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Ismaël Mejía


Avro 1.9.0 was released with many nice features including reduced size (1MB 
less), and removed dependencies, no paranmer, no shaded guava, security 
updates, so probably a worth upgrade.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager

2019-05-16 Thread GitBox
flinkbot edited a comment on issue #8428: [FLINK-12497][network] Refactor the 
start method of ConnectionManager
URL: https://github.com/apache/flink/pull/8428#issuecomment-491750033
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @azagrebin
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @azagrebin
   * ❗ 3. Needs [attention] from.
   - Needs attention by @azagrebin
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @azagrebin
   * ✅ 5. Overall code [quality] is good.
   - Approved by @azagrebin
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager

2019-05-16 Thread GitBox
azagrebin commented on issue #8428: [FLINK-12497][network] Refactor the start 
method of ConnectionManager
URL: https://github.com/apache/flink/pull/8428#issuecomment-493004991
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager

2019-05-16 Thread GitBox
azagrebin commented on a change in pull request #8428: [FLINK-12497][network] 
Refactor the start method of ConnectionManager
URL: https://github.com/apache/flink/pull/8428#discussion_r284632408
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
 ##
 @@ -35,24 +37,27 @@
 
private final PartitionRequestClientFactory 
partitionRequestClientFactory;
 
-   private final boolean isCreditBased;
+   private final NettyProtocol nettyProtocol;
+
+   public NettyConnectionManager(
+   ResultPartitionProvider partitionProvider,
+   TaskEventPublisher taskEventPublisher,
+   NettyConfig nettyConfig,
+   boolean isCreditBased) {
 
-   public NettyConnectionManager(NettyConfig nettyConfig, boolean 
isCreditBased) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new 
NettyBufferPool(nettyConfig.getNumberOfArenas());
 
this.partitionRequestClientFactory = new 
PartitionRequestClientFactory(client);
 
-   this.isCreditBased = isCreditBased;
+   this.nettyProtocol = new 
NettyProtocol(checkNotNull(partitionProvider), 
checkNotNull(taskEventPublisher), isCreditBased);
 
 Review comment:
   could be new line after `new NettyProtocol(` to avoid long lines


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8464: [hotfix] fix typos.

2019-05-16 Thread GitBox
flinkbot commented on issue #8464: [hotfix] fix typos.
URL: https://github.com/apache/flink/pull/8464#issuecomment-493004237
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] mr-cloud opened a new pull request #8464: [hotfix] fix typos.

2019-05-16 Thread GitBox
mr-cloud opened a new pull request #8464: [hotfix] fix typos.
URL: https://github.com/apache/flink/pull/8464
 
 
   1. [flink-core] below -> above in TypeInformation methods
   2. [flink-streaming-java] CLusterUtil -> ClusterUtil in 
LocalStreamEnvironment methods
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10466) flink-yarn-tests should depend flink-dist

2019-05-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10466.

   Resolution: Fixed
Fix Version/s: 1.9.0

master: 29fdef8d237350df1cf915a62390f49e408ba960

> flink-yarn-tests should depend flink-dist
> -
>
> Key: FLINK-10466
> URL: https://issues.apache.org/jira/browse/FLINK-10466
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> may be adding
> {code:java}
> 
>  org.apache.flink
>  flink-dist_${scala.binary.version}
>  ${project.version}
>  test
>  pom
> {code}
> not really sure but it causes failure on my automate testing process, and by 
> adding this dependency the error disappear. Even I wonder how it works 
> currently on travis.
> flink-yarn-test obviously depends on flink-dist since some tests try to find 
> flink uberjar.
> Please take a look for this. cc [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-16 Thread GitBox
eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] 
[runtime] Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284630476
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
 ##
 @@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adapter;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.TestRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.RELEASED;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link DefaultResultPartition}.
+ */
+public class DefaultResultPartitionTest {
+
+   private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+
+   private ExecutionGraph executionGraph;
+
+   private ExecutionGraphToSchedulingTopologyAdapter adapter;
+
+   private List intermediateResultPartitions;
+
+   private List schedulingResultPartitions;
+
+   @Before
+   public void setUp() throws Exception {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol merged pull request #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test

2019-05-16 Thread GitBox
zentol merged pull request #8398: [FLINK-10466][build] Add flink-dist 
dependency to yarn-test
URL: https://github.com/apache/flink/pull/8398
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284618858
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -38,24 +44,76 @@ def is_stream(self, is_stream):
 
 @property
 def parallelism(self):
+"""
+The parallelism for all operations.
+"""
 return self._parallelism
 
 @parallelism.setter
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+"""
+The timezone_id for a timezone, either an abbreviation such as "PST", 
a full name such as
+"America/Los_Angeles", or a custom timezone_id such as "GMT-8:00".
+"""
+return self._j_table_config.getTimeZone().getID()
+
+@timezone.setter
+def timezone(self, timezone_id):
+if timezone_id is not None and isinstance(timezone_id, str):
 
 Review comment:
   should also consider unicode


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284615878
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -96,16 +105,413 @@ def where(self, predicate):
 """
 Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
 clause.
+
 Example:
 ::
-
 >>> tab.where("name = 'Fred'")
 
 :param predicate: Predicate expression string.
 :return: Result table.
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns only distinct (different) values.
+
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+

[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284617719
 
 

 ##
 File path: flink-python/pyflink/table/table_config.py
 ##
 @@ -38,24 +44,76 @@ def is_stream(self, is_stream):
 
 @property
 def parallelism(self):
+"""
+The parallelism for all operations.
+"""
 return self._parallelism
 
 @parallelism.setter
 def parallelism(self, parallelism):
 self._parallelism = parallelism
 
+@property
+def timezone(self):
+"""
+The timezone_id for a timezone, either an abbreviation such as "PST", 
a full name such as
 
 Review comment:
   `The timezone_id for a timezone` ->  `Returns the timezone id`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284612580
 
 

 ##
 File path: flink-python/pyflink/table/query_config.py
 ##
 @@ -0,0 +1,112 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABCMeta
+from datetime import timedelta
+from py4j.compat import long
+
+from pyflink.java_gateway import get_gateway
+
+
+class QueryConfig(object):
+"""
+The :class:`QueryConfig` holds parameters to configure the behavior of 
queries.
+"""
+
+__metaclass__ = ABCMeta
+
+def __init__(self, j_query_config):
+self._j_query_config = j_query_config
+
+
+class StreamQueryConfig(QueryConfig):
+"""
+The :class:`StreamQueryConfig` holds parameters to configure the behavior 
of streaming queries.
+"""
+
+def __init__(self, j_stream_query_config=None):
+self._jvm = get_gateway().jvm
+if j_stream_query_config is not None:
+self._j_stream_query_config = j_stream_query_config
+else:
+self._j_stream_query_config = self._jvm.StreamQueryConfig()
+super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
+
+def with_idle_state_retention_time(self, min_time, max_time):
+"""
+Specifies a minimum and a maximum time interval for how long idle 
state, i.e., state which
+was not updated, will be retained.
+
+State will never be cleared until it was idle for less than the 
minimum time and will never
+be kept if it was idle for more than the maximum time.
+
+When new data arrives for previously cleaned-up state, the new data 
will be handled as if it
+was the first data. This can result in previous results being 
overwritten.
+
+Set to ``datetime.timedelta()``(zero) to never clean-up the state.
+
+.. note::
+Cleaning up state requires additional bookkeeping which becomes 
less expensive for
+larger differences of minTime and maxTime. The difference between 
minTime and maxTime
+must be at least ``datetime.timedelta(minutes=5)``(5 minutes).
+
+:param min_time: The minimum time interval for which idle state is 
retained. Set to
+ ``datetime.timedelta()``(zero) to never clean-up the 
state.
+:param max_time: The maximum time interval for which idle state is 
retained. Must be at
+ least 5 minutes greater than minTime. Set to
+ ``datetime.timedelta()``(zero) to never clean-up the 
state.
+:return: :class:`StreamQueryConfig`
+"""
+#  type: (timedelta, timedelta) -> StreamQueryConfig
+j_time_class = self._jvm.org.apache.flink.api.common.time.Time
+j_min_time = 
j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+j_max_time = 
j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+self._j_stream_query_config = \
+self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, 
j_max_time)
+return self
+
+def get_min_idle_state_retention_time(self):
+"""
+State might be cleared and removed if it was not updated for the 
defined period of time.
+
+:return: The minimum time until state which was not updated will be 
retained.
+"""
+#  type: () -> int
+return self._j_stream_query_config.getMinIdleStateRetentionTime()
+
+def get_max_idle_state_retention_time(self):
+"""
+State will be cleared and removed if it was not updated for the 
defined period of time.
+
+:return: The maximum time until state which was not updated will be 
retained.
+"""
+#  type: () -> int
+return self._j_stream_query_config.getMaxIdleStateRetentionTime()
+
+
+class BatchQueryConfig(QueryConfig):
+"""
+The :class:`BatchQueryConfig` holds parameters to configure the behavior 

[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284615491
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -96,16 +105,413 @@ def where(self, predicate):
 """
 Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
 clause.
+
 Example:
 ::
-
 >>> tab.where("name = 'Fred'")
 
 :param predicate: Predicate expression string.
 :return: Result table.
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns only distinct (different) values.
+
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+

[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284622626
 
 

 ##
 File path: flink-python/pyflink/table/table_environment.py
 ##
 @@ -153,9 +240,51 @@ def __init__(self, j_tenv):
 self._j_tenv = j_tenv
 super(StreamTableEnvironment, self).__init__(j_tenv)
 
+def get_config(self):
+"""
+Returns the table config to define the runtime behavior of the Table 
API.
+
+:return: Current :class:`TableConfig`.
+"""
+table_config = TableConfig()
+table_config._j_table_config = self._j_tenv.getConfig()
+table_config.is_stream = True
 
 Review comment:
   Currently there are two ways to set the table config properties: in 
TableConfig and TableConfig.Builder. What about removing all the set methods in 
TableConfig?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-16 Thread GitBox
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add 
all table operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#discussion_r284614157
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -96,16 +105,413 @@ def where(self, predicate):
 """
 Filters out elements that don't pass the filter predicate. Similar to 
a SQL WHERE
 clause.
+
 Example:
 ::
-
 >>> tab.where("name = 'Fred'")
 
 :param predicate: Predicate expression string.
 :return: Result table.
 """
 return Table(self._j_table.where(predicate))
 
+def group_by(self, fields):
+"""
+Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
+
+Example:
+::
+>>> tab.group_by("key").select("key, value.avg")
+
+:param fields: Group keys.
+:return: The grouped table.
+"""
+return GroupedTable(self._j_table.groupBy(fields))
+
+def distinct(self):
+"""
+Removes duplicate values and returns only distinct (different) values.
+
+Example:
+::
+>>> tab.select("key, value").distinct()
+
+:return: Result table.
+"""
+return Table(self._j_table.distinct())
+
+def join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL join. The fields of the 
two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary. You can use where and select clauses after a join to 
further specify the
+behaviour of the join.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` .
+
+Example:
+::
+>>> left.join(right).where("a = b && c > 3").select("a, b, d")
+>>> left.join(right, "a = b")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is not None:
+return Table(self._j_table.join(right._j_table, join_predicate))
+else:
+return Table(self._j_table.join(right._j_table))
+
+def left_outer_join(self, right, join_predicate=None):
+"""
+Joins two :class:`Table`s. Similar to a SQL left outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.left_outer_join(right).select("a, b, d")
+>>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: Optional, the join predicate expression string.
+:return: Result table.
+"""
+if join_predicate is None:
+return Table(self._j_table.leftOuterJoin(right._j_table))
+else:
+return Table(self._j_table.leftOuterJoin(right._j_table, 
join_predicate))
+
+def right_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL right outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+"""
+return Table(self._j_table.rightOuterJoin(right._j_table, 
join_predicate))
+
+def full_outer_join(self, right, join_predicate):
+"""
+Joins two :class:`Table`s. Similar to a SQL full outer join. The 
fields of the two joined
+operations must not overlap, use :func:`~pyflink.table.Table.alias` to 
rename fields if
+necessary.
+
+.. note::
+Both tables must be bound to the same :class:`TableEnvironment` 
and its
+:class:`TableConfig` must have null check enabled (default).
+
+Example:
+::
+>>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+:param right: Right table.
+:param join_predicate: The join predicate expression string.
+:return: Result table.
+

[GitHub] [flink] flinkbot edited a comment on issue #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test

2019-05-16 Thread GitBox
flinkbot edited a comment on issue #8398: [FLINK-10466][build] Add flink-dist 
dependency to yarn-test
URL: https://github.com/apache/flink/pull/8398#issuecomment-491180514
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @azagrebin
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @azagrebin
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @azagrebin
   * ✅ 5. Overall code [quality] is good.
   - Approved by @azagrebin
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test

2019-05-16 Thread GitBox
azagrebin commented on issue #8398: [FLINK-10466][build] Add flink-dist 
dependency to yarn-test
URL: https://github.com/apache/flink/pull/8398#issuecomment-492999241
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12530) Move Task.inputGatesById to NetworkEnvironment

2019-05-16 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12530:

Summary: Move Task.inputGatesById to NetworkEnvironment  (was: Move 
Task#inputGatesById to NetworkEnvironment)

> Move Task.inputGatesById to NetworkEnvironment
> --
>
> Key: FLINK-12530
> URL: https://issues.apache.org/jira/browse/FLINK-12530
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Task.inputGatesById indexes SingleInputGates by id. The end user of this 
> indexing is NetworkEnviroment for two cases:
> - SingleInputGate triggers producer partition readiness check and then the 
> successful result of check is dispatched back to this SingleInputGate by id. 
> We can just add an additional argument to 
> TaskActions.triggerPartitionProducerStateCheck. The argument is an immediate 
> callback to that SingleInputGate. Then inputGatesById is not needed for 
> dispatching.
> - TaskExecutor.updatePartitions uses inputGatesById to dispatch PartitionInfo 
> update to the right SingleInputGate. If inputGatesById is moved to 
> NetworkEnviroment, which should be a better place for gate management, and 
> add NetworkEnviroment.updatePartitionInfo then TaskExecutor.updatePartitions 
> could directly call NetworkEnviroment.updatePartitionInfo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on issue #8461: [FLINK-12422] Remove IN_TESTS for make test code and production code consistent

2019-05-16 Thread GitBox
yanghua commented on issue #8461: [FLINK-12422] Remove IN_TESTS for make test 
code and production code consistent
URL: https://github.com/apache/flink/pull/8461#issuecomment-492994671
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-16 Thread GitBox
flinkbot edited a comment on issue #8463: [FLINK-12530][network] Move 
Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#issuecomment-492991609
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @zhijiangw
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-16 Thread GitBox
azagrebin commented on issue #8463: [FLINK-12530][network] Move 
Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#issuecomment-492992119
 
 
   @flinkbot attention @zhijiangW 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment

2019-05-16 Thread GitBox
flinkbot commented on issue #8463: [FLINK-12530][network] Move 
Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#issuecomment-492991609
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


<    1   2   3   4   5   >