[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284687035 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java ## @@ -18,131 +18,112 @@ package org.apache.flink.runtime.scheduler.adapter; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.executiongraph.TestRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.Test; +import javax.xml.ws.Provider; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.common.InputDependencyConstraint.ALL; -import static org.apache.flink.api.common.InputDependencyConstraint.ANY; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * Unit tests for {@link DefaultExecutionVertex}. */ -public class DefaultExecutionVertexTest { - - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); +public class DefaultExecutionVertexTest extends TestLogger { - private final int parallelism = 3; + private final ExecutionStateProviderTest stateProvider = new ExecutionStateProviderTest(); private List schedulingExecutionVertices; - private List executionVertices; + private IntermediateResultPartitionID intermediateResultPartitionId; @Before public void setUp() throws Exception { - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - ExecutionGraph executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - ExecutionGraphToSchedulingTopologyAdapter adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - schedulingExecutionVertices = new ArrayList<>(); - adapter.getVertices().forEach(vertex -> schedulingExecutionVertices.add(vertex)); - executionVertices = new ArrayList<>(); - executionGraph.getAllExecutionVertices().forEach(vertex -> executionVertices.add(vertex)); - } - @Test - public void testGetId() { - for (int idx = 0; idx < schedulingExecutionVertices.size(); idx++){ - assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(), - executionVertices.get(idx).getJobvertexId()); - assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(), - executionVertices.get(idx).getParallelSubtaskIndex()); - } +
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284689769 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java ## @@ -18,131 +18,112 @@ package org.apache.flink.runtime.scheduler.adapter; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.executiongraph.TestRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.Test; +import javax.xml.ws.Provider; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.common.InputDependencyConstraint.ALL; -import static org.apache.flink.api.common.InputDependencyConstraint.ANY; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * Unit tests for {@link DefaultExecutionVertex}. */ -public class DefaultExecutionVertexTest { - - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); +public class DefaultExecutionVertexTest extends TestLogger { - private final int parallelism = 3; + private final ExecutionStateProviderTest stateProvider = new ExecutionStateProviderTest(); private List schedulingExecutionVertices; - private List executionVertices; + private IntermediateResultPartitionID intermediateResultPartitionId; @Before public void setUp() throws Exception { - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - ExecutionGraph executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - ExecutionGraphToSchedulingTopologyAdapter adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - schedulingExecutionVertices = new ArrayList<>(); - adapter.getVertices().forEach(vertex -> schedulingExecutionVertices.add(vertex)); - executionVertices = new ArrayList<>(); - executionGraph.getAllExecutionVertices().forEach(vertex -> executionVertices.add(vertex)); - } - @Test - public void testGetId() { - for (int idx = 0; idx < schedulingExecutionVertices.size(); idx++){ - assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(), - executionVertices.get(idx).getJobvertexId()); - assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(), - executionVertices.get(idx).getParallelSubtaskIndex()); - } +
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284697765 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java ## @@ -62,121 +49,73 @@ /** * Unit tests for {@link DefaultResultPartition}. */ -public class DefaultResultPartitionTest { +public class DefaultResultPartitionTest extends TestLogger { - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + private final DefaultExecutionVertexTest.ExecutionStateProviderTest stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest(); - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + private List schedulingExecutionVertices; - private ExecutionGraph executionGraph; - - private ExecutionGraphToSchedulingTopologyAdapter adapter; - - private List intermediateResultPartitions; - - private List schedulingResultPartitions; + private DefaultResultPartition resultPartition; @Before - public void setUp() throws Exception { - final int parallelism = 3; - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - intermediateResultPartitions = new ArrayList<>(); - schedulingResultPartitions = new ArrayList<>(); - - for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { - for (Map.Entry entry - : vertex.getProducedPartitions().entrySet()) { - intermediateResultPartitions.add(entry.getValue()); - schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey()) - .orElseThrow(() -> new IllegalArgumentException("can not find partition" + entry.getKey(; - } - } - assertEquals(parallelism, intermediateResultPartitions.size()); - } - - @Test - public void testBasicInfo() { - for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) { - final IntermediateResultPartition partition = intermediateResultPartitions.get(idx); - final SchedulingResultPartition schedulingResultPartition = schedulingResultPartitions.get(idx); - assertEquals(partition.getPartitionId(), schedulingResultPartition.getId()); - assertEquals(partition.getIntermediateResult().getId(), schedulingResultPartition.getResultId()); - assertEquals(partition.getResultType(), schedulingResultPartition.getPartitionType()); - } + public void setUp() { + schedulingExecutionVertices = new ArrayList<>(2); + resultPartition = new DefaultResultPartition( + new IntermediateResultPartitionID(), + new IntermediateDataSetID(), + BLOCKING); + + DefaultExecutionVertex vertex1 = new DefaultExecutionVertex( + new ExecutionVertexID(new JobVertexID(), 0), + Collections.singletonList(resultPartition), + ALL, + stateProvider); + resultPartition.setProducer(vertex1); + DefaultExecutionVertex vertex2 = new DefaultExecutionVertex( + new ExecutionVertexID(new JobVertexID(), 0), + java.util.Collections.emptyList(), + ALL, + stateProvider); + resultPartition.addConsumer(vertex2); + schedulingExecutionVertices.add(vertex1); + schedulingExecutionVertices.add(vertex2); } @Test public void testGetConsumers() { - for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) { - Collection schedulingConsumers = schedulingResultPartitions.get(idx).getConsumers() -
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284695496 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java ## @@ -18,131 +18,112 @@ package org.apache.flink.runtime.scheduler.adapter; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.executiongraph.TestRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.Test; +import javax.xml.ws.Provider; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.common.InputDependencyConstraint.ALL; -import static org.apache.flink.api.common.InputDependencyConstraint.ANY; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * Unit tests for {@link DefaultExecutionVertex}. */ -public class DefaultExecutionVertexTest { - - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); +public class DefaultExecutionVertexTest extends TestLogger { - private final int parallelism = 3; + private final ExecutionStateProviderTest stateProvider = new ExecutionStateProviderTest(); private List schedulingExecutionVertices; - private List executionVertices; + private IntermediateResultPartitionID intermediateResultPartitionId; @Before public void setUp() throws Exception { - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - ExecutionGraph executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - ExecutionGraphToSchedulingTopologyAdapter adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - schedulingExecutionVertices = new ArrayList<>(); - adapter.getVertices().forEach(vertex -> schedulingExecutionVertices.add(vertex)); - executionVertices = new ArrayList<>(); - executionGraph.getAllExecutionVertices().forEach(vertex -> executionVertices.add(vertex)); - } - @Test - public void testGetId() { - for (int idx = 0; idx < schedulingExecutionVertices.size(); idx++){ - assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(), - executionVertices.get(idx).getJobvertexId()); - assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(), - executionVertices.get(idx).getParallelSubtaskIndex()); - } +
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284689364 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java ## @@ -18,131 +18,112 @@ package org.apache.flink.runtime.scheduler.adapter; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.executiongraph.TestRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.Test; +import javax.xml.ws.Provider; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.common.InputDependencyConstraint.ALL; -import static org.apache.flink.api.common.InputDependencyConstraint.ANY; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * Unit tests for {@link DefaultExecutionVertex}. */ -public class DefaultExecutionVertexTest { - - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); +public class DefaultExecutionVertexTest extends TestLogger { - private final int parallelism = 3; + private final ExecutionStateProviderTest stateProvider = new ExecutionStateProviderTest(); private List schedulingExecutionVertices; - private List executionVertices; + private IntermediateResultPartitionID intermediateResultPartitionId; @Before public void setUp() throws Exception { - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - ExecutionGraph executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - ExecutionGraphToSchedulingTopologyAdapter adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - schedulingExecutionVertices = new ArrayList<>(); - adapter.getVertices().forEach(vertex -> schedulingExecutionVertices.add(vertex)); - executionVertices = new ArrayList<>(); - executionGraph.getAllExecutionVertices().forEach(vertex -> executionVertices.add(vertex)); - } - @Test - public void testGetId() { - for (int idx = 0; idx < schedulingExecutionVertices.size(); idx++){ - assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(), - executionVertices.get(idx).getJobvertexId()); - assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(), - executionVertices.get(idx).getParallelSubtaskIndex()); - } +
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284684706 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; + +import javax.xml.ws.Provider; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default implementation of {@link SchedulingExecutionVertex}. + */ +public class DefaultExecutionVertex implements SchedulingExecutionVertex { + + private final ExecutionVertexID executionVertexId; + + private final List consumedPartitions; + + private final List producedPartitions; + + private final InputDependencyConstraint inputDependencyConstraint; + + private final Provider stateProvider; Review comment: ah, I meant `Supplier`, my bad This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284689201 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java ## @@ -18,131 +18,112 @@ package org.apache.flink.runtime.scheduler.adapter; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.executiongraph.TestRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.Test; +import javax.xml.ws.Provider; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.common.InputDependencyConstraint.ALL; -import static org.apache.flink.api.common.InputDependencyConstraint.ANY; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * Unit tests for {@link DefaultExecutionVertex}. */ -public class DefaultExecutionVertexTest { - - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); +public class DefaultExecutionVertexTest extends TestLogger { - private final int parallelism = 3; + private final ExecutionStateProviderTest stateProvider = new ExecutionStateProviderTest(); private List schedulingExecutionVertices; - private List executionVertices; + private IntermediateResultPartitionID intermediateResultPartitionId; @Before public void setUp() throws Exception { - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - ExecutionGraph executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - ExecutionGraphToSchedulingTopologyAdapter adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - schedulingExecutionVertices = new ArrayList<>(); - adapter.getVertices().forEach(vertex -> schedulingExecutionVertices.add(vertex)); - executionVertices = new ArrayList<>(); - executionGraph.getAllExecutionVertices().forEach(vertex -> executionVertices.add(vertex)); - } - @Test - public void testGetId() { - for (int idx = 0; idx < schedulingExecutionVertices.size(); idx++){ - assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(), - executionVertices.get(idx).getJobvertexId()); - assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(), - executionVertices.get(idx).getParallelSubtaskIndex()); - } +
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284697592 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java ## @@ -62,121 +49,73 @@ /** * Unit tests for {@link DefaultResultPartition}. */ -public class DefaultResultPartitionTest { +public class DefaultResultPartitionTest extends TestLogger { - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + private final DefaultExecutionVertexTest.ExecutionStateProviderTest stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest(); - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + private List schedulingExecutionVertices; - private ExecutionGraph executionGraph; - - private ExecutionGraphToSchedulingTopologyAdapter adapter; - - private List intermediateResultPartitions; - - private List schedulingResultPartitions; + private DefaultResultPartition resultPartition; @Before - public void setUp() throws Exception { - final int parallelism = 3; - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - intermediateResultPartitions = new ArrayList<>(); - schedulingResultPartitions = new ArrayList<>(); - - for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { - for (Map.Entry entry - : vertex.getProducedPartitions().entrySet()) { - intermediateResultPartitions.add(entry.getValue()); - schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey()) - .orElseThrow(() -> new IllegalArgumentException("can not find partition" + entry.getKey(; - } - } - assertEquals(parallelism, intermediateResultPartitions.size()); - } - - @Test - public void testBasicInfo() { - for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) { - final IntermediateResultPartition partition = intermediateResultPartitions.get(idx); - final SchedulingResultPartition schedulingResultPartition = schedulingResultPartitions.get(idx); - assertEquals(partition.getPartitionId(), schedulingResultPartition.getId()); - assertEquals(partition.getIntermediateResult().getId(), schedulingResultPartition.getResultId()); - assertEquals(partition.getResultType(), schedulingResultPartition.getPartitionType()); - } + public void setUp() { + schedulingExecutionVertices = new ArrayList<>(2); Review comment: we never iterate over the array, so we could just have 2 fields (producer and consumer), is a bit easier to read This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284695717 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java ## @@ -18,131 +18,112 @@ package org.apache.flink.runtime.scheduler.adapter; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.executiongraph.TestRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.Test; +import javax.xml.ws.Provider; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.common.InputDependencyConstraint.ALL; -import static org.apache.flink.api.common.InputDependencyConstraint.ANY; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** * Unit tests for {@link DefaultExecutionVertex}. */ -public class DefaultExecutionVertexTest { - - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); +public class DefaultExecutionVertexTest extends TestLogger { - private final int parallelism = 3; + private final ExecutionStateProviderTest stateProvider = new ExecutionStateProviderTest(); private List schedulingExecutionVertices; - private List executionVertices; + private IntermediateResultPartitionID intermediateResultPartitionId; @Before public void setUp() throws Exception { - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - ExecutionGraph executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - ExecutionGraphToSchedulingTopologyAdapter adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - schedulingExecutionVertices = new ArrayList<>(); - adapter.getVertices().forEach(vertex -> schedulingExecutionVertices.add(vertex)); - executionVertices = new ArrayList<>(); - executionGraph.getAllExecutionVertices().forEach(vertex -> executionVertices.add(vertex)); - } - @Test - public void testGetId() { - for (int idx = 0; idx < schedulingExecutionVertices.size(); idx++){ - assertEquals(schedulingExecutionVertices.get(idx).getId().getJobVertexId(), - executionVertices.get(idx).getJobvertexId()); - assertEquals(schedulingExecutionVertices.get(idx).getId().getSubtaskIndex(), - executionVertices.get(idx).getParallelSubtaskIndex()); - } +
[jira] [Assigned] (FLINK-11560) Translate "Flink Applications" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhou Yumin reassigned FLINK-11560: -- Assignee: Zhou Yumin > Translate "Flink Applications" page into Chinese > > > Key: FLINK-11560 > URL: https://issues.apache.org/jira/browse/FLINK-11560 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Zhou Yumin >Priority: Major > > Translate "Flink Applications" page into Chinese. > The markdown file is located in: flink-web/flink-applications.zh.md > The url link is: https://flink.apache.org/zh/flink-applications.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable URL: https://github.com/apache/flink/pull/8430#discussion_r284602469 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java ## @@ -30,37 +38,245 @@ public class RestartPipelinedRegionStrategyTest extends TestLogger { Review comment: I'll add tests for them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable URL: https://github.com/apache/flink/pull/8430#discussion_r284684306 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java ## @@ -165,6 +181,33 @@ private void buildOneRegionForAllVertices() { for (FailoverVertex vertex : topology.getFailoverVertices()) { vertexToRegionMap.put(vertex.getExecutionVertexID(), region); } + + buildRegionInputsAndOutputs(); + } + + private void buildRegionInputsAndOutputs() { + for (FailoverRegion region : regions.keySet()) { + IdentityHashMap consumers = new IdentityHashMap<>(); + Set inputs = new HashSet<>(); + Set consumerVertices = new HashSet<>(); + Set regionVertices = region.getAllExecutionVertices(); + regionVertices.forEach(v -> { Review comment: I tried the merged logic and find the region building becomes even slower. I think this is due to the merged logic saved some iteration overhead, but added some more work, including building some legacy relations and later deprecating them in region merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#issuecomment-493052179 @WeiZhong94 Thanks a lot for the updated. LGTM. +1 to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable
zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable URL: https://github.com/apache/flink/pull/8430#discussion_r284599683 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java ## @@ -57,7 +66,9 @@ */ public RestartPipelinedRegionStrategy(FailoverTopology topology) { this.topology = checkNotNull(topology); - this.regions = new HashMap<>(); + this.regions = new IdentityHashMap<>(); + this.vertexToRegionMap = new HashMap<>(); + this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(); Review comment: Good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu edited a comment on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu edited a comment on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#issuecomment-493052179 @WeiZhong94 Thanks a lot for the update. LGTM. +1 to merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12512) TableSourceTest#testNestedProject test failed
[ https://issues.apache.org/jira/browse/FLINK-12512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12512: - Priority: Critical (was: Major) > TableSourceTest#testNestedProject test failed > - > > Key: FLINK-12512 > URL: https://issues.apache.org/jira/browse/FLINK-12512 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: vinoyang >Priority: Critical > > > {code:java} > 20:41:59.128 [ERROR] > testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest) > Time elapsed: 0.047 s <<< FAILURE! > org.junit.ComparisonFailure: > null expected:<...deepNested.nested2.f[lag AS nestedFlag, > deepNested.nested2.num AS nestedNum]) > StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], > source=[TestSource(read nested fields: id.*, deepNested.nested2.num, > deepNested.nested2.flag], deepNested.nested1...> but > was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS > nestedNum]) > StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], > source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, > deepNested.nested2.f0], deepNested.nested1...> > at > org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375) > {code} > log details : [https://api.travis-ci.org/v3/job/532319575/log.txt] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12512) TableSourceTest#testNestedProject test failed
[ https://issues.apache.org/jira/browse/FLINK-12512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12512: - Issue Type: Bug (was: Improvement) > TableSourceTest#testNestedProject test failed > - > > Key: FLINK-12512 > URL: https://issues.apache.org/jira/browse/FLINK-12512 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: vinoyang >Priority: Blocker > > > {code:java} > 20:41:59.128 [ERROR] > testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest) > Time elapsed: 0.047 s <<< FAILURE! > org.junit.ComparisonFailure: > null expected:<...deepNested.nested2.f[lag AS nestedFlag, > deepNested.nested2.num AS nestedNum]) > StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], > source=[TestSource(read nested fields: id.*, deepNested.nested2.num, > deepNested.nested2.flag], deepNested.nested1...> but > was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS > nestedNum]) > StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], > source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, > deepNested.nested2.f0], deepNested.nested1...> > at > org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375) > {code} > log details : [https://api.travis-ci.org/v3/job/532319575/log.txt] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12512) TableSourceTest#testNestedProject test failed
[ https://issues.apache.org/jira/browse/FLINK-12512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12512: - Priority: Blocker (was: Critical) > TableSourceTest#testNestedProject test failed > - > > Key: FLINK-12512 > URL: https://issues.apache.org/jira/browse/FLINK-12512 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: vinoyang >Priority: Blocker > > > {code:java} > 20:41:59.128 [ERROR] > testNestedProject(org.apache.flink.table.api.stream.table.TableSourceTest) > Time elapsed: 0.047 s <<< FAILURE! > org.junit.ComparisonFailure: > null expected:<...deepNested.nested2.f[lag AS nestedFlag, > deepNested.nested2.num AS nestedNum]) > StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], > source=[TestSource(read nested fields: id.*, deepNested.nested2.num, > deepNested.nested2.flag], deepNested.nested1...> but > was:<...deepNested.nested2.f[1 AS nestedFlag, deepNested.nested2.f0 AS > nestedNum]) > StreamTableSourceScan(table=[[T]], fields=[id, deepNested, nested], > source=[TestSource(read nested fields: id.*, deepNested.nested2.f1, > deepNested.nested2.f0], deepNested.nested1...> > at > org.apache.flink.table.api.stream.table.TableSourceTest.testNestedProject(TableSourceTest.scala:375) > {code} > log details : [https://api.travis-ci.org/v3/job/532319575/log.txt] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284682569 ## File path: flink-python/pyflink/table/table_environment.py ## @@ -153,9 +240,51 @@ def __init__(self, j_tenv): self._j_tenv = j_tenv super(StreamTableEnvironment, self).__init__(j_tenv) +def get_config(self): +""" +Returns the table config to define the runtime behavior of the Table API. + +:return: Current :class:`TableConfig`. +""" +table_config = TableConfig() +table_config._j_table_config = self._j_tenv.getConfig() +table_config.is_stream = True Review comment: I think we'd better keep those set method but set them to private, because without them we need to put those setters' logic in TableConfig.Builder. I think it makes more sense to keep these logic in TableConfig. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465#issuecomment-493037224 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @zentol [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465#issuecomment-493047188 @flinkbot attention @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zzchun edited a comment on issue #8372: [FLINK-12354] [table] Add REVERSE function for table/sql API
zzchun edited a comment on issue #8372: [FLINK-12354] [table] Add REVERSE function for table/sql API URL: https://github.com/apache/flink/pull/8372#issuecomment-492262212 cc @xccui @twalthr @fhueske , can you help me to review this MR, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284678324 ## File path: flink-python/pyflink/table/table_config.py ## @@ -38,24 +44,76 @@ def is_stream(self, is_stream): @property def parallelism(self): +""" +The parallelism for all operations. +""" return self._parallelism @parallelism.setter def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +""" +The timezone_id for a timezone, either an abbreviation such as "PST", a full name such as +"America/Los_Angeles", or a custom timezone_id such as "GMT-8:00". +""" +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): +if timezone_id is not None and isinstance(timezone_id, str): Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284678283 ## File path: flink-python/pyflink/table/table_config.py ## @@ -38,24 +44,76 @@ def is_stream(self, is_stream): @property def parallelism(self): +""" +The parallelism for all operations. +""" return self._parallelism @parallelism.setter def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +""" +The timezone_id for a timezone, either an abbreviation such as "PST", a full name such as Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284678079 ## File path: flink-python/pyflink/table/table.py ## @@ -96,16 +105,413 @@ def where(self, predicate): """ Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE clause. + Example: :: - >>> tab.where("name = 'Fred'") :param predicate: Predicate expression string. :return: Result table. """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns only distinct (different) values. + +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284678208 ## File path: flink-python/pyflink/table/table.py ## @@ -96,16 +105,413 @@ def where(self, predicate): """ Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE clause. + Example: :: - >>> tab.where("name = 'Fred'") :param predicate: Predicate expression string. :return: Result table. """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns only distinct (different) values. + +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284678253 ## File path: flink-python/pyflink/table/table.py ## @@ -96,16 +105,413 @@ def where(self, predicate): """ Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE clause. + Example: :: - >>> tab.where("name = 'Fred'") :param predicate: Predicate expression string. :return: Result table. """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns only distinct (different) values. + +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
WeiZhong94 commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284678155 ## File path: flink-python/pyflink/table/query_config.py ## @@ -0,0 +1,112 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta +from datetime import timedelta +from py4j.compat import long + +from pyflink.java_gateway import get_gateway + + +class QueryConfig(object): +""" +The :class:`QueryConfig` holds parameters to configure the behavior of queries. +""" + +__metaclass__ = ABCMeta + +def __init__(self, j_query_config): +self._j_query_config = j_query_config + + +class StreamQueryConfig(QueryConfig): +""" +The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries. +""" + +def __init__(self, j_stream_query_config=None): +self._jvm = get_gateway().jvm +if j_stream_query_config is not None: +self._j_stream_query_config = j_stream_query_config +else: +self._j_stream_query_config = self._jvm.StreamQueryConfig() +super(StreamQueryConfig, self).__init__(self._j_stream_query_config) + +def with_idle_state_retention_time(self, min_time, max_time): +""" +Specifies a minimum and a maximum time interval for how long idle state, i.e., state which +was not updated, will be retained. + +State will never be cleared until it was idle for less than the minimum time and will never +be kept if it was idle for more than the maximum time. + +When new data arrives for previously cleaned-up state, the new data will be handled as if it +was the first data. This can result in previous results being overwritten. + +Set to ``datetime.timedelta()``(zero) to never clean-up the state. + +.. note:: +Cleaning up state requires additional bookkeeping which becomes less expensive for +larger differences of minTime and maxTime. The difference between minTime and maxTime +must be at least ``datetime.timedelta(minutes=5)``(5 minutes). + +:param min_time: The minimum time interval for which idle state is retained. Set to + ``datetime.timedelta()``(zero) to never clean-up the state. +:param max_time: The maximum time interval for which idle state is retained. Must be at + least 5 minutes greater than minTime. Set to + ``datetime.timedelta()``(zero) to never clean-up the state. +:return: :class:`StreamQueryConfig` +""" +# type: (timedelta, timedelta) -> StreamQueryConfig +j_time_class = self._jvm.org.apache.flink.api.common.time.Time +j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000))) +j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000))) +self._j_stream_query_config = \ +self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time) +return self + +def get_min_idle_state_retention_time(self): +""" +State might be cleared and removed if it was not updated for the defined period of time. + +:return: The minimum time until state which was not updated will be retained. +""" +# type: () -> int +return self._j_stream_query_config.getMinIdleStateRetentionTime() + +def get_max_idle_state_retention_time(self): +""" +State will be cleared and removed if it was not updated for the defined period of time. + +:return: The maximum time until state which was not updated will be retained. +""" +# type: () -> int +return self._j_stream_query_config.getMaxIdleStateRetentionTime() + + +class BatchQueryConfig(QueryConfig): +""" +The :class:`BatchQueryConfig` holds parameters to configure the
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284670894 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java ## @@ -124,4 +171,25 @@ public static InputChannelMetrics newUnregisteredInputChannelMetrics() { /** This class is not meant to be instantiated. */ private InputChannelTestUtils() {} + + /** Test stub for {@link MemorySegmentProvider}. */ + public static class StubMemorySegmentProvider implements MemorySegmentProvider { + private static final MemorySegmentProvider INSTANCE = new StubMemorySegmentProvider(); + + public static MemorySegmentProvider getInstance() { + return INSTANCE; + } + + private StubMemorySegmentProvider() { + } + + @Override + public Collection requestMemorySegments() { + return null; Review comment: seems safer to return an empty collection here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284655543 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java ## @@ -88,7 +90,9 @@ public void testConsumptionWithRemoteChannels() throws Exception { final ConnectionManager connManager = createDummyConnectionManager(); final Source[] sources = new Source[numberOfChannels]; - final SingleInputGate gate = createSingleInputGate(numberOfChannels); Review comment: the majority of these changes don't really seem necessary; we could just modify `createSingleInputGate` to use the builder and keep it as a short-hand. (btw., it is a bit odd that you aren't touching `InputChannelTestUtils` in this commit) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284672498 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -114,11 +114,12 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { .setNumTargetKeyGroups(parallelism) .setResultPartitionManager(partitionManager) .setSendScheduleOrUpdateConsumersMessage(true) + .setBufferPoolFactory(p -> + networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize)) .build(); // Create a buffer pool for this partition Review comment: comment seems a bit out-dated, I'd probably just remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284667951 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java ## @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws Exception { final PartitionRequestClient client = new PartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final int numExclusiveBuffers = 2; Review comment: variable name no longer makes sense; currently it implies that in the entire pool there are exactly 2 exclusive buffers. Also applies to other instances. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284656335 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskmanager.NoOpTaskActions; +import org.apache.flink.runtime.taskmanager.TaskActions; + +/** + * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance. + */ +public class SingleInputGateBuilder { + + private JobID jobId = new JobID(); + + private IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); Review comment: a bunch of these can be final This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284660153 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -123,13 +124,17 @@ public static NetworkEnvironment create( new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); registerNetworkMetrics(metricGroup, networkBufferPool); + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + ResultPartitionFactory resultPartitionFactory = + new ResultPartitionFactory(resultPartitionManager, checkNotNull(ioManager)); Review comment: let's move preconditions to the front of the method to catch things early and make these contracts easier to discover. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284667365 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -236,35 +236,32 @@ public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { } /** -* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* Tests {@link NetworkBufferPool} constructor with the invalid argument to * cause exception. */ - @Test - public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { - final int numBuffers = 10; - - NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); - + @Test(expected = IllegalArgumentException.class) + public void testRequestMemorySegmentsWithInvalidArgument() { + NetworkBufferPool globalPool = null; try { // the number of requested buffers should be larger than zero - globalPool.requestMemorySegments(0); + globalPool = new NetworkBufferPool(10, 128, 0); Review comment: Would move this out of the `try` block. If this fails the finally block isn't doing anything anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284672050 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -154,13 +161,16 @@ * The pool is registered with the partition *after* it as been constructed in order to conform * to the life-cycle of task registrations in the {@link TaskExecutor}. */ - public void registerBufferPool(BufferPool bufferPool) { - checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(), - "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition."); - + @Override + public void setup() throws IOException { checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool."); + BufferPool bufferPool = bufferPoolFactory.apply(this); + checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(), + "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition."); + this.bufferPool = checkNotNull(bufferPool); Review comment: null checks is ineffective This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284668096 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java ## @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws Exception { final PartitionRequestClient client = new PartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final int numExclusiveBuffers = 2; Review comment: maybe you could just in-line this thing like in most other tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284658421 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; + +import java.net.InetSocketAddress; + +/** + * Builder for various {@link InputChannel} types. + */ +public class InputChannelBuilder { + static final ConnectionID STUB_CONNECTION_ID = + new ConnectionID(new InetSocketAddress("localhost", 5000), 0); + + private int channelIndex = 0; + private ResultPartitionID partitionId = new ResultPartitionID(); + private ConnectionID connectionID = STUB_CONNECTION_ID; + private ResultPartitionManager partitionManager = new ResultPartitionManager(); + private TaskEventPublisher taskEventPublisher = new TaskEventDispatcher(); + private ConnectionManager connectionManager = new LocalConnectionManager(); + private int initialBackoff = 0; + private int maxBackoff = 0; + private InputChannelMetrics metrics = InputChannelTestUtils.newUnregisteredInputChannelMetrics(); + + public static InputChannelBuilder newBuilder() { + return new InputChannelBuilder(); + } + + public InputChannelBuilder setChannelIndex(int channelIndex) { + this.channelIndex = channelIndex; + return this; + } + + public InputChannelBuilder setPartitionId(ResultPartitionID partitionId) { + this.partitionId = partitionId; + return this; + } + + public InputChannelBuilder setPartitionManager(ResultPartitionManager partitionManager) { + this.partitionManager = partitionManager; + return this; + } + + InputChannelBuilder setTaskEventPublisher(TaskEventPublisher taskEventPublisher) { + this.taskEventPublisher = taskEventPublisher; + return this; + } + + public InputChannelBuilder setConnectionManager(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + return this; + } + + public InputChannelBuilder setInitialBackoff(int initialBackoff) { + this.initialBackoff = initialBackoff; + return this; + } + + public InputChannelBuilder setMaxBackoff(int maxBackoff) { + this.maxBackoff = maxBackoff; + return this; + } + + public InputChannelBuilder setMetrics(InputChannelMetrics metrics) { + this.metrics = metrics; + return this; + } + + InputChannelBuilder setupFromNetworkEnvironment(NetworkEnvironment network) { + this.partitionManager = network.getResultPartitionManager(); + this.connectionManager = network.getConnectionManager(); + this.initialBackoff = network.getConfiguration().partitionRequestInitialBackoff(); + this.maxBackoff = network.getConfiguration().partitionRequestMaxBackoff(); + return this; + } + + UnknownInputChannel buildUnknown(SingleInputGate inputGate) { + UnknownInputChannel channel = new UnknownInputChannel( +
[GitHub] [flink] zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zentol commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r284659459 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ## @@ -92,31 +92,44 @@ private boolean isShutdown; - public NetworkEnvironment( - NetworkEnvironmentConfiguration config, - TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup, - IOManager ioManager) { - this.config = checkNotNull(config); - - this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); - - NettyConfig nettyConfig = config.nettyConfig(); - if (nettyConfig != null) { - this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased()); - } else { - this.connectionManager = new LocalConnectionManager(); - } + private NetworkEnvironment( + NetworkEnvironmentConfiguration config, Review comment: please stick to the existing indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284674239 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +/** + * A mailbox is basically a blocking queue for inter-thread message exchange in form of {@link Runnable} objects between + * multiple producer threads and a single consumer. + */ +public interface Mailbox extends MailboxReceiver, MailboxSender { + + /** +* The effect of this is that all pending letters are dropped and the given priorityAction +* is enqueued to the head of the mailbox. +* +* @param priorityAction action to enqueue atomically after the mailbox was cleared. +*/ + void clearAndPut(@Nonnull Runnable priorityAction); Review comment: There is an additional benefit from adding this, which is that the compiler introduces an assertion for `!= null` if you run the code with `-ea`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12303) Scala 2.12 lambdas does not work in event classes inside streams.
[ https://issues.apache.org/jira/browse/FLINK-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841258#comment-16841258 ] Aljoscha Krettek commented on FLINK-12303: -- Hi [~snilard], yes you're right, this wasn't the real issue. I'm sorry about that. The real problem is that Scala 2.12 changes how Scala Lambdas are implemented (they now use the same underlying mechanism as Java Lambdas, which is available from Java 8 an onwards). It used to be that Kryo could serialize Scala Lambdas, which is used as a serializer in your test cases. It can't do that anymore for Scala 2.12 lambdas. You can see exception if you enable logging, for example by adding these dependencies to your {{build.sbt}}: {code} "org.slf4j" % "slf4j-log4j12" % "1.7.7" % "runtime", "log4j" % "log4j" % "1.2.17" % "runtime" {code} and by putting a {{log4j.properties}} in {{src/resources}}: {code} log4j.rootLogger=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n {code} This is the exception you will then find in the logs: {code} com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: lambda (demo.LambdaCaseClass) lambda (demo.TsEventClass) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:33) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark(AbstractFetcher.java:459) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:404) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57) ... 26 more {code} > Scala 2.12 lambdas does not work in event classes inside streams. > - > > Key: FLINK-12303 > URL: https://issues.apache.org/jira/browse/FLINK-12303 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala >Affects
[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r284671140 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link Catalog} as well as {@link ExternalCatalog}). + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName), + "Default catalog name cannot be null or empty"); + checkNotNull(defaultCatalog, "Default catalog cannot be null"); + catalogs = new LinkedHashMap<>(); + externalCatalogs = new LinkedHashMap<>(); + catalogs.put(defaultCatalogName, defaultCatalog); + this.currentCatalogName = defaultCatalogName; + this.currentDatabaseName = defaultCatalog.getDefaultDatabase(); + } + + /** +* Registers a catalog under the given name. The catalog name must be unique across both +* {@link Catalog}s and {@link ExternalCatalog}s. +* +* @param catalogName name under which to register the given catalog +* @param catalog catalog to register +* @throws CatalogException if the registration of the catalog under the given name failed +*/ + public void registerCatalog(String catalogName, Catalog catalog) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); + checkNotNull(catalog, "Catalog cannot be null"); + + if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) { + throw new CatalogException(format("Catalog %s already exists.", catalogName)); + } + + catalogs.put(catalogName, catalog); + catalog.open(); + } + + /** +* Gets a catalog by name. +* +* @param catalogName name of the catalog to retrieve +* @return the requested catalog or empty if it does not exist +* @see CatalogManager#getExternalCatalog(String) +*/ + public Optional getCatalog(String catalogName) { + return Optional.ofNullable(catalogs.get(catalogName)); + } + + /** +* Registers an external catalog under the given name. The catalog name must be unique across both +*
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284670626 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java ## @@ -66,21 +66,20 @@ public StreamIterationHead(Environment env) { // @Override - protected boolean performDefaultAction() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { StreamRecord nextRecord = shouldWait ? dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : dataChannel.take(); - if (nextRecord == null) { - return false; - } - - synchronized (getCheckpointLock()) { - for (RecordWriterOutput output : streamOutputs) { - output.collect(nextRecord); + if (nextRecord != null) { Review comment: I don't think there is a general rule like this, also not in the wip styleguide. I mean it is as good as asking to put the "likely" case first, which this does. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #8414: [FLINK-12477][Build] Enforce maven version 3.1.1
zentol commented on issue #8414: [FLINK-12477][Build] Enforce maven version 3.1.1 URL: https://github.com/apache/flink/pull/8414#issuecomment-493037451 Finally you also have to update the ` cron-master-maven_compat` branch to no longer check for maven 3.0.X compatibility This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12534) Reduce the test cost for Python API
[ https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12534: --- Labels: pull-request-available (was: ) > Reduce the test cost for Python API > --- > > Key: FLINK-12534 > URL: https://issues.apache.org/jira/browse/FLINK-12534 > Project: Flink > Issue Type: Improvement > Components: API / Python, Travis >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > > Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop > 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 > is enough, and we can remove the test for Scala 2.12 and Hadoop 2.4.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
flinkbot commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465#issuecomment-493037224 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 opened a new pull request #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
WeiZhong94 opened a new pull request #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465 ## What is the purpose of the change *This pull request removes all jars except flink-dist.jar, flink-table.jar and flink-table-planner-test.jar from travis compile result and remove python tests in scala 2.12 and hadoop 2.4.1 travis cron test jobs. That means only these 3 jars would be cached in travis, which can make the cache file shrink by 350MB per profile. * ## Brief change log - *Delete more jars after compile, only flink-dist.jar, flink-table.jar and flink-table-planner-test.jar will be cached now.* - *Remove python tests in scala 2.12 and hadoop 2.4.1 cron test jobs.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"
[ https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841245#comment-16841245 ] Gary Yao commented on FLINK-12384: -- If you are running ZK behind an LB, there are additional considerations to make: https://wiki.apache.org/hadoop/ZooKeeper/FAQ#A8 I am mentioning this here because it is not clear what _"analytics-zetcd:2181"_ refers to. > Rolling the etcd servers causes "Connected to an old server; r-o mode will be > unavailable" > -- > > Key: FLINK-12384 > URL: https://issues.apache.org/jira/browse/FLINK-12384 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Henrik >Priority: Major > > {code:java} > [tm] 2019-05-01 13:30:53,316 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - > Initiating client connection, connectString=analytics-zetcd:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f > [tm] 2019-05-01 13:30:53,384 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. > [tm] 2019-05-01 13:30:53,395 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening > socket connection to server > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181 > [tm] 2019-05-01 13:30:53,395 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Using > configured hostname/address for TaskManager: 10.1.2.173. > [tm] 2019-05-01 13:30:53,401 ERROR > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - > Authentication failed > [tm] 2019-05-01 13:30:53,418 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to > start actor system at 10.1.2.173:0 > [tm] 2019-05-01 13:30:53,420 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket > connection established to > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating > session > [tm] 2019-05-01 13:30:53,500 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket - > Connected to an old server; r-o mode will be unavailable > [tm] 2019-05-01 13:30:53,500 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session > establishment complete on server > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = > 0xbf06a739001d446, negotiated timeout = 6 > [tm] 2019-05-01 13:30:53,525 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager > - State change: CONNECTED{code} > Repro: > Start an etcd-cluster, with e.g. etcd-operator, with three members. Start > zetcd in front. Configure the sesssion cluster to go against zetcd. > Ensure the job can start successfully. > Now, kill the etcd pods one by one, letting the quorum re-establish in > between, so that the cluster is still OK. > Now restart the job/tm pods. You'll end up in this no-mans-land. > > — > Workaround: clean out the etcd cluster and remove all its data, however, this > resets all time windows and state, despite having that saved in GCS, so it's > a crappy workaround. > > – > > flink-conf.yaml > {code:java} > parallelism.default: 1 > rest.address: analytics-job > jobmanager.rpc.address: analytics-job # = resource manager's address too > jobmanager.heap.size: 1024m > jobmanager.rpc.port: 6123 > jobmanager.slot.request.timeout: 3 > resourcemanager.rpc.port: 6123 > high-availability.jobmanager.port: 6123 > blob.server.port: 6124 > queryable-state.server.ports: 6125 > taskmanager.heap.size: 1024m > taskmanager.numberOfTaskSlots: 1 > web.log.path: /var/lib/log/flink/jobmanager.log > rest.port: 8081 > rest.bind-address: 0.0.0.0 > web.submit.enable: false > high-availability: zookeeper > high-availability.storageDir: gs://example_analytics/flink/zetcd/ > high-availability.zookeeper.quorum: analytics-zetcd:2181 > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.client.acl: open > state.backend: rocksdb > state.checkpoints.num-retained: 3 > state.checkpoints.dir: gs://example_analytics/flink/checkpoints > state.savepoints.dir: gs://example_analytics/flink/savepoints > state.backend.incremental: true > state.backend.async: true > fs.hdfs.hadoopconf: /opt/flink/hadoop > log.file: /var/lib/log/flink/jobmanager.log{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284669437 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Consumer-facing side of the {@link Mailbox} interface. This is used to dequeue letters. The mailbox returns letters + * in the order by which they were enqueued. A mailbox should only be consumed by one thread at a time. + */ +public interface MailboxReceiver { + + /** +* Returns true if the mailbox contains mail. +*/ + boolean hasMail(); + + /** +* Returns the oldest letter from the mailbox (head of queue) if the mailbox is not empty or +* null otherwise. +* +* @return the oldest letter from the mailbox (head of queue) if the mailbox is not empty or +* null otherwise. +*/ + @Nullable + Runnable tryTakeMail(); Review comment: I would prefer to keep this as returning `null` because it is a data structure and therefore in line with what JDK queues etc do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284668961 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implementation of {@link Mailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards + * our use case with multiple writers, single reader and volatile reads instead of lock & read on {@link #count}. + */ +public class MailboxImpl implements Mailbox { + + /** +* The enqueued letters. +*/ + @GuardedBy("lock") + private final Runnable[] ringBuffer; + + /** +* Lock for all concurrent ops. +*/ + private final ReentrantLock lock; + + /** +* Condition that is triggered when the buffer is no longer empty. +*/ + @GuardedBy("lock") + private final Condition notEmpty; + + /** +* Condition that is triggered when the buffer is no longer full. +*/ + @GuardedBy("lock") + private final Condition notFull; + + /** +* Index of the ring buffer head. +*/ + @GuardedBy("lock") + private int headIndex; + + /** +* Index of the ring buffer tail. +*/ + @GuardedBy("lock") + private int tailIndex; + + /** +* Number of letters in the mailbox. +*/ + @GuardedBy("lock") + private volatile int count; + + /** +* A mask to wrap around the indexes of the ring buffer. We use this to avoid ifs or modulo ops. +*/ + private final int moduloMask; + + public MailboxImpl() { + this(6); // 2^6 = 64 + } + + public MailboxImpl(int capacityPow2) { + final int capacity = 1 << capacityPow2; + Preconditions.checkState(capacity > 0); + this.moduloMask = capacity - 1; + this.ringBuffer = new Runnable[capacity]; + this.lock = new ReentrantLock(); + this.notEmpty = lock.newCondition(); + this.notFull = lock.newCondition(); + } + + @Override + public boolean hasMail() { Review comment: Is empty would always be used inverted, so I think the positive phrase is ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284668779 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +/** + * Producer-facing side of the {@link Mailbox} interface. This is used to enqueue letters. Multiple producers threads + * can put to the same mailbox. + */ +public interface MailboxSender { + + /** +* Enqueues the given letter to the mailbox, if capacity is available. On success, this returns true +* and false if the mailbox was already full. +* +* @param letter the letter to enqueue. +* @return true iff successful. +*/ + boolean tryPutMail(@Nonnull Runnable letter); + + /** +* Enqueues the given letter to the mailbox and blocks until there is capacity for a successful put. +* +* @param letter the letter to enqueue. +* @throws InterruptedException on interruption. +*/ + void putMail(@Nonnull Runnable letter) throws InterruptedException; + + /** +* This method blocks until the mailbox has again capacity to enqueue new letters. +* +* @throws InterruptedException on interruption. +*/ + void waitUntilHasCapacity() throws InterruptedException; Review comment: Same: For clients that want to be non-blocking w.r.t. the queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284668589 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +/** + * Producer-facing side of the {@link Mailbox} interface. This is used to enqueue letters. Multiple producers threads + * can put to the same mailbox. + */ +public interface MailboxSender { + + /** +* Enqueues the given letter to the mailbox, if capacity is available. On success, this returns true +* and false if the mailbox was already full. +* +* @param letter the letter to enqueue. +* @return true iff successful. +*/ + boolean tryPutMail(@Nonnull Runnable letter); Review comment: I think it can be helpful if client code that wants to run code can be non-blocking if required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284668074 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -230,14 +242,29 @@ protected StreamTask( * This method implements the default action of the task (e.g. processing one event from the input). Implementations * should (in general) be non-blocking. * -* @return true if there is more work to perform as default action for this task and false -* if the task is ready to finish. +* @param context context object for collaborative interaction between the action and the stream task. * @throws Exception on any problems in the action. */ - protected abstract boolean performDefaultAction() throws Exception; + protected abstract void performDefaultAction(ActionContext context) throws Exception; - protected void run() throws Exception { - while (isRunning && performDefaultAction()) {} + /** +* Runs the stream-tasks main processing loop. +*/ + private void run() throws Exception { Review comment: Please see my comment here: https://github.com/apache/flink/pull/8409#discussion_r284663084 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12039) Remove ASSIGNED_SLOT_UPDATER in Execution.tryAssignResource
[ https://issues.apache.org/jira/browse/FLINK-12039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-12039: - Priority: Minor (was: Major) > Remove ASSIGNED_SLOT_UPDATER in Execution.tryAssignResource > --- > > Key: FLINK-12039 > URL: https://issues.apache.org/jira/browse/FLINK-12039 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Minor > > After making access to ExecutionGraph single-threaded in FLINK-11417, we can > simplify execution slot assignment in Execution.tryAssignResource and get rid > of ASSIGNED_SLOT_UPDATER as it happens now only in one JM main thread. > Though, it seems that we have to keep `assignedResource` as volatile at the > moment which could be further investigated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"
[ https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841209#comment-16841209 ] Gary Yao edited comment on FLINK-12384 at 5/16/19 11:49 AM: [~haf] I checked the ZK client code and the warning is not something to be concerned about. The client is talking to a ZK server that does not support the r-o mode. Also see: https://github.com/apache/zookeeper/blob/e45551fc7c691332ace7bff81926855e42ac2239/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L147-L149 http://zookeeper-user.578899.n2.nabble.com/Connected-to-an-old-server-r-o-mode-will-be-unavailable-td7578775.html Is your cluster not starting up correctly or not recovering the jobs? If that is the case, I would like to see the complete jobmanager logs if possible. was (Author: gjy): [~haf] I checked the ZK client code and the warning is not something to be concerned about. The client is talking to a ZK server that does not support the r-o mode. Also see: https://github.com/apache/zookeeper/blob/e45551fc7c691332ace7bff81926855e42ac2239/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L147-L149 http://zookeeper-user.578899.n2.nabble.com/Connected-to-an-old-server-r-o-mode-will-be-unavailable-td7578775.html Is your cluster not starting up correctly or recovering the jobs? If that is the case, I would like to see the complete jobmanager logs if possible. > Rolling the etcd servers causes "Connected to an old server; r-o mode will be > unavailable" > -- > > Key: FLINK-12384 > URL: https://issues.apache.org/jira/browse/FLINK-12384 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Henrik >Priority: Major > > {code:java} > [tm] 2019-05-01 13:30:53,316 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - > Initiating client connection, connectString=analytics-zetcd:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f > [tm] 2019-05-01 13:30:53,384 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. > [tm] 2019-05-01 13:30:53,395 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening > socket connection to server > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181 > [tm] 2019-05-01 13:30:53,395 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Using > configured hostname/address for TaskManager: 10.1.2.173. > [tm] 2019-05-01 13:30:53,401 ERROR > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - > Authentication failed > [tm] 2019-05-01 13:30:53,418 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to > start actor system at 10.1.2.173:0 > [tm] 2019-05-01 13:30:53,420 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket > connection established to > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating > session > [tm] 2019-05-01 13:30:53,500 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket - > Connected to an old server; r-o mode will be unavailable > [tm] 2019-05-01 13:30:53,500 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session > establishment complete on server > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = > 0xbf06a739001d446, negotiated timeout = 6 > [tm] 2019-05-01 13:30:53,525 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager > - State change: CONNECTED{code} > Repro: > Start an etcd-cluster, with e.g. etcd-operator, with three members. Start > zetcd in front. Configure the sesssion cluster to go against zetcd. > Ensure the job can start successfully. > Now, kill the etcd pods one by one, letting the quorum re-establish in > between, so that the cluster is still OK. > Now restart the job/tm pods. You'll end up in this no-mans-land. > > — > Workaround: clean out the etcd cluster and remove all its data, however, this > resets all time windows and state, despite having that saved in GCS, so it's > a crappy workaround. > > – > > flink-conf.yaml > {code:java} > parallelism.default: 1 > rest.address: analytics-job > jobmanager.rpc.address: analytics-job # = resource manager's address too >
[jira] [Updated] (FLINK-12528) Remove progressLock in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-12528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-12528: - Priority: Minor (was: Major) > Remove progressLock in ExecutionGraph > - > > Key: FLINK-12528 > URL: https://issues.apache.org/jira/browse/FLINK-12528 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Since {{ExecutionGraph}} can only be accessed from a single > thread(FLINK-11417), we can remove the progressLock from {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks
StefanRRichter commented on a change in pull request #8442: [FLINK-12483] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks URL: https://github.com/apache/flink/pull/8442#discussion_r284667461 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java ## @@ -98,12 +100,44 @@ protected void cleanup() { } @Override - protected void run() throws Exception { - headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + protected void performDefaultAction(ActionContext context) throws Exception { + // Against the usual contract of this method, this implementation is not step-wise but blocking instead for Review comment: That suggestion would need an `if` in the default action that triggers for the first invocation. Starting in another method, the question is which one and we also probably require to join back in another place then. What I mean is, the alternative loop does two things different: - no invocation of default action. - running all letters inside a `synchronized` block. So the question that remains is, how you plan to add the synchronized block to the existing mailbox loop? Personally I feel like this approach might be the cleanest and most performant, but I would be open to suggestions for a complete alternative if you have something in mind. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12527) Remove GLOBAL_VERSION_UPDATER in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-12527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-12527: - Priority: Minor (was: Major) > Remove GLOBAL_VERSION_UPDATER in ExecutionGraph > --- > > Key: FLINK-12527 > URL: https://issues.apache.org/jira/browse/FLINK-12527 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Since {{ExecutionGraph}} can only be accessed from a single thread. We can > remove {{AtomicLongFieldUpdater GLOBAL_VERSION_UPDATER}} from > {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841244#comment-16841244 ] vinoyang commented on FLINK-10855: -- OK [~till.rohrmann] thanks. [~richtesn] can you still want to be my mentor to discuss the design of the cleanup task? > CheckpointCoordinator does not delete checkpoint directory of late/failed > checkpoints > - > > Key: FLINK-10855 > URL: https://issues.apache.org/jira/browse/FLINK-10855 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > > In case that an acknowledge checkpoint message is late or a checkpoint cannot > be acknowledged, we discard the subtask state in the > {{CheckpointCoordinator}}. What's not happening in this case is that we > delete the parent directory of the checkpoint. This only happens when we > dispose a {{PendingCheckpoint#dispose}}. > Due to this behaviour it can happen that a checkpoint fails (e.g. a task not > being ready) and we delete the checkpoint directory. Next another task writes > its checkpoint data to the checkpoint directory (thereby creating it again) > and sending an acknowledge message back to the {{CheckpointCoordinator}}. The > {{CheckpointCoordinator}} will realize that there is no longer a > {{PendingCheckpoint}} and will discard the sub task state. This will remove > the state files from the checkpoint directory but will leave the checkpoint > directory untouched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12526) Remove STATE_UPDATER in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841243#comment-16841243 ] vinoyang commented on FLINK-12526: -- [~till.rohrmann] OK, thanks for the notice, I have not started it. Will wait a bit unit [~gjy] finish his work. > Remove STATE_UPDATER in ExecutionGraph > -- > > Key: FLINK-12526 > URL: https://issues.apache.org/jira/browse/FLINK-12526 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Since {{ExecutionGraph}} can only be accessed from a single > thread(FLINK-11417). We can remove the > {{AtomicReferenceFieldUpdater STATE_UPDATER}} from > {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa…
StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa… URL: https://github.com/apache/flink/pull/8409#discussion_r284665683 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -222,12 +222,24 @@ protected StreamTask( protected abstract void init() throws Exception; - protected abstract void run() throws Exception; - protected abstract void cleanup() throws Exception; protected abstract void cancelTask() throws Exception; + /** +* This method implements the default action of the task (e.g. processing one event from the input). Implementations +* should (in general) be non-blocking. +* +* @return true if there is more work to perform as default action for this task and false +* if the task is ready to finish. +* @throws Exception on any problems in the action. +*/ + protected abstract boolean performDefaultAction() throws Exception; Review comment: It changes the line twice because it is incrementally changed. I see no problem with that, it is work for different PRs with different purposes. If we would really decide to merge them, then we should also merge the JIRA tasks, but I think they are valuable as separated steps. Even if i had a dummy context, the boolean is still required and the suggestion would not really help to solve the problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa…
StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa… URL: https://github.com/apache/flink/pull/8409#discussion_r284664043 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java ## @@ -41,88 +42,73 @@ private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); - private volatile boolean running = true; + private RecordWriterOutput[] streamOutputs; + + private final BlockingQueue> dataChannel; + private final String brokerID; + private final long iterationWaitTime; + private final boolean shouldWait; public StreamIterationHead(Environment env) { super(env); + final String iterationId = getConfiguration().getIterationId(); + if (iterationId == null || iterationId.length() == 0) { + throw new FlinkRuntimeException("Missing iteration ID in the task configuration"); + } + + this.dataChannel = new ArrayBlockingQueue<>(1); + this.brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); + this.iterationWaitTime = getConfiguration().getIterationWaitTime(); + this.shouldWait = iterationWaitTime > 0; Review comment: In general not a bad idea, but how can I assign to `final` fields from an init method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5621) Flink should provide a mechanism to prevent scheduling tasks on TaskManagers with operational issues
[ https://issues.apache.org/jira/browse/FLINK-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841238#comment-16841238 ] Till Rohrmann commented on FLINK-5621: -- I think this issue is tightly related to a general blacklisting mechanism of TMs in a Flink cluster. There was a [design discussion|http://osdir.com/apache-flink-development/msg09858.html] on the dev ML and also a design document. Since this is a bigger feature we should first reach consensus how to do it. > Flink should provide a mechanism to prevent scheduling tasks on TaskManagers > with operational issues > > > Key: FLINK-5621 > URL: https://issues.apache.org/jira/browse/FLINK-5621 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Affects Versions: 1.1.4 >Reporter: Jamie Grier >Assignee: vinoyang >Priority: Major > > There are cases where jobs can get into a state where no progress can be made > if there is something pathologically wrong with one of the TaskManager nodes > in the cluster. > An example of this would be a TaskManager on a machine that runs out of disk > space. Flink never considers the TM to be "bad" and will keep using it to > attempt to run tasks -- which will continue to fail. > A suggestion for overcoming this would be to allow an option where a TM will > commit suicide if that TM was the source of an exception that caused a job to > fail/restart. > I'm sure there are plenty of other approaches to solving this.. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa…
StefanRRichter commented on a change in pull request #8409: [FLINK-12478] Decompose monolithic run-loops in StreamTask implementa… URL: https://github.com/apache/flink/pull/8409#discussion_r284663084 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ## @@ -98,13 +96,8 @@ public void init() throws Exception { } @Override - protected void run() throws Exception { - // cache processor reference on the stack, to make the code more JIT friendly - final StreamInputProcessor inputProcessor = this.inputProcessor; - - while (running && inputProcessor.processInput()) { - // all the work happens in the "processInput" method - } + protected boolean performDefaultAction() throws Exception { Review comment: I ran the following: ``` env.setParallelism(1); source .map(new MultiplyByTwo()) .addSink(new DiscardingSink<>()); master: Benchmark Mode Cnt Score Error Units SumLongsBenchmark.benchmarkCount thrpt 30 16240.866 ± 311.845 ops/ms master + #8442: Benchmark Mode Cnt Score Error Units SumLongsBenchmark.benchmarkCount thrpt 30 16540.537 ± 295.100 ops/ms ``` ``` env.setParallelism(1); source .map(new MultiplyByTwo()) .rebalance() .map((x) -> x) .addSink(new DiscardingSink<>()); master: Benchmark Mode Cnt Score Error Units SumLongsBenchmark.benchmarkCount thrpt 30 9715.334 ± 170.632 ops/ms master + #8442: Benchmark Mode Cnt Score Error Units SumLongsBenchmark.benchmarkCount thrpt 30 9932.752 ± 124.494 ops/ms ``` I would conclude the changes run as fast or a notch faster than master on my local machine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841233#comment-16841233 ] Till Rohrmann commented on FLINK-10855: --- I guess that a regular clean up task could do the trick [~yanghua]. > CheckpointCoordinator does not delete checkpoint directory of late/failed > checkpoints > - > > Key: FLINK-10855 > URL: https://issues.apache.org/jira/browse/FLINK-10855 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > > In case that an acknowledge checkpoint message is late or a checkpoint cannot > be acknowledged, we discard the subtask state in the > {{CheckpointCoordinator}}. What's not happening in this case is that we > delete the parent directory of the checkpoint. This only happens when we > dispose a {{PendingCheckpoint#dispose}}. > Due to this behaviour it can happen that a checkpoint fails (e.g. a task not > being ready) and we delete the checkpoint directory. Next another task writes > its checkpoint data to the checkpoint directory (thereby creating it again) > and sending an acknowledge message back to the {{CheckpointCoordinator}}. The > {{CheckpointCoordinator}} will realize that there is no longer a > {{PendingCheckpoint}} and will discard the sub task state. This will remove > the state files from the checkpoint directory but will leave the checkpoint > directory untouched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12526) Remove STATE_UPDATER in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841232#comment-16841232 ] Till Rohrmann commented on FLINK-12526: --- Let's do it step by step and only change the {{STATE_UPDATER}}. I would suggest to wait a bit with that because Gary is currently implementing the new {{Scheduler}} which also needs changes on the {{ExecutionGraph}}. There might be merge conflicts due to this work. > Remove STATE_UPDATER in ExecutionGraph > -- > > Key: FLINK-12526 > URL: https://issues.apache.org/jira/browse/FLINK-12526 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Since {{ExecutionGraph}} can only be accessed from a single > thread(FLINK-11417). We can remove the > {{AtomicReferenceFieldUpdater STATE_UPDATER}} from > {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12526) Remove STATE_UPDATER in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-12526: -- Priority: Minor (was: Major) > Remove STATE_UPDATER in ExecutionGraph > -- > > Key: FLINK-12526 > URL: https://issues.apache.org/jira/browse/FLINK-12526 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Since {{ExecutionGraph}} can only be accessed from a single > thread(FLINK-11417). We can remove the > {{AtomicReferenceFieldUpdater STATE_UPDATER}} from > {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12476) [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
[ https://issues.apache.org/jira/browse/FLINK-12476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12476: Description: At the moment we have two efficient background cleanup strategies: incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend type and inactive for other. They have different tuning parameters. The idea is to add method *cleanupInBackground* which would activate default background cleanup. User does not need to think then about details or used backend if not needed. Depending on actually used backend, the corresponding cleanup will kick in. If original strategy is not set with *cleanupIncrementally* and *cleanupInRocksdbCompactFilter* then backends should check whether default background cleanup is activated and if so, use it with default parameters. We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in favour of this new method. was: At the moment we have two efficient background cleanup strategies: incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend type and inactive for other. They have different tuning parameters. The idea is to add method *cleanupInBackground* which would activate default background cleanup. User does not need to think then about details or used backend if not needed. Depending on actually used backend, the corresponding cleanup will kick in. Backends should check whether default background cleanup is activated We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in favour of this new method. > [State TTL] Consider setting a default background cleanup strategy in > StateTtlConfig > > > Key: FLINK-12476 > URL: https://issues.apache.org/jira/browse/FLINK-12476 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.8.0 >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0, 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > At the moment we have two efficient background cleanup strategies: > incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has > 2 methods to activate them: *cleanupIncrementally* and > *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend > type and inactive for other. They have different tuning parameters. > The idea is to add method *cleanupInBackground* which would activate default > background cleanup. User does not need to think then about details or used > backend if not needed. Depending on actually used backend, the corresponding > cleanup will kick in. If original strategy is not set with > *cleanupIncrementally* and *cleanupInRocksdbCompactFilter* then backends > should check whether default background cleanup is activated and if so, use > it with default parameters. > We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in > favour of this new method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12476) [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
[ https://issues.apache.org/jira/browse/FLINK-12476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12476: Description: At the moment we have two efficient background cleanup strategies: incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend type and inactive for other. They have different tuning parameters. The idea is to add method *cleanupInBackground* which would activate default background cleanup. User does not need to think then about details or used backend if not needed. Depending on actually used backend, the corresponding cleanup will kick in. Backends should check whether default background cleanup is activated We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in favour of this new method. was: At the moment we have two efficient background cleanup strategies: incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has 2 methods to activate them: *cleanupIncrementally* and *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend type and inactive for other. They have different tuning parameters. The idea is to add method *cleanupInBackground* which would activate both of them with default parameters. User does not need to think then about details or used backend if not needed. Depending on actually used backend, the corresponding cleanup will kick in. Sample implementation: {code:java} public Builder cleanupInBackground() { return cleanupIncrementally(10, false).cleanupInRocksdbCompactFilter(1000L); } {code} We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in favour of this new method. > [State TTL] Consider setting a default background cleanup strategy in > StateTtlConfig > > > Key: FLINK-12476 > URL: https://issues.apache.org/jira/browse/FLINK-12476 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.8.0 >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0, 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > At the moment we have two efficient background cleanup strategies: > incremental for heap and compaction filter for RocksDB. *StateTtlConfig* has > 2 methods to activate them: *cleanupIncrementally* and > *cleanupInRocksdbCompactFilter*. Each is activated only for certain backend > type and inactive for other. They have different tuning parameters. > The idea is to add method *cleanupInBackground* which would activate default > background cleanup. User does not need to think then about details or used > backend if not needed. Depending on actually used backend, the corresponding > cleanup will kick in. Backends should check whether default background > cleanup is activated > We can also deprecated the parameterless *cleanupInRocksdbCompactFilter()* in > favour of this new method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
aljoscha commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r284659324 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -327,6 +329,14 @@ public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) { return this; } + /** +* Cleanup expired state by activating both strategies. Depending on actually used backend, the corresponding cleanup will kick in. +*/ + @Nonnull + public Builder cleanupInBackground() { + return cleanupIncrementally(10, true).cleanupInRocksdbCompactFilter(1000L); Review comment: exactly! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r284658756 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -327,6 +329,14 @@ public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) { return this; } + /** +* Cleanup expired state by activating both strategies. Depending on actually used backend, the corresponding cleanup will kick in. +*/ + @Nonnull + public Builder cleanupInBackground() { + return cleanupIncrementally(10, true).cleanupInRocksdbCompactFilter(1000L); Review comment: @aljoscha true, we can move concrete decision about default configuration to backends. @yanghua the method `cleanupInBackground` should be still added but rather as a boolean flag in StateTtlConfig. Backends should check this flag where they check configuration of their corresponding cleanup strategies and use default ones. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager
zhijiangW commented on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager URL: https://github.com/apache/flink/pull/8428#issuecomment-493024930 Thanks for the review @azagrebin ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"
[ https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841209#comment-16841209 ] Gary Yao commented on FLINK-12384: -- [~haf] I checked the ZK client code and the warning is not something to be concerned about. The client is talking to a ZK server that does not support the r-o mode. Also see: https://github.com/apache/zookeeper/blob/e45551fc7c691332ace7bff81926855e42ac2239/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java#L147-L149 http://zookeeper-user.578899.n2.nabble.com/Connected-to-an-old-server-r-o-mode-will-be-unavailable-td7578775.html Is your cluster not starting up correctly or recovering the jobs? If that is the case, I would like to see the complete jobmanager logs if possible. > Rolling the etcd servers causes "Connected to an old server; r-o mode will be > unavailable" > -- > > Key: FLINK-12384 > URL: https://issues.apache.org/jira/browse/FLINK-12384 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Henrik >Priority: Major > > {code:java} > [tm] 2019-05-01 13:30:53,316 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - > Initiating client connection, connectString=analytics-zetcd:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f > [tm] 2019-05-01 13:30:53,384 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. > [tm] 2019-05-01 13:30:53,395 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening > socket connection to server > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181 > [tm] 2019-05-01 13:30:53,395 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Using > configured hostname/address for TaskManager: 10.1.2.173. > [tm] 2019-05-01 13:30:53,401 ERROR > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - > Authentication failed > [tm] 2019-05-01 13:30:53,418 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to > start actor system at 10.1.2.173:0 > [tm] 2019-05-01 13:30:53,420 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket > connection established to > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating > session > [tm] 2019-05-01 13:30:53,500 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket - > Connected to an old server; r-o mode will be unavailable > [tm] 2019-05-01 13:30:53,500 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session > establishment complete on server > analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = > 0xbf06a739001d446, negotiated timeout = 6 > [tm] 2019-05-01 13:30:53,525 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager > - State change: CONNECTED{code} > Repro: > Start an etcd-cluster, with e.g. etcd-operator, with three members. Start > zetcd in front. Configure the sesssion cluster to go against zetcd. > Ensure the job can start successfully. > Now, kill the etcd pods one by one, letting the quorum re-establish in > between, so that the cluster is still OK. > Now restart the job/tm pods. You'll end up in this no-mans-land. > > — > Workaround: clean out the etcd cluster and remove all its data, however, this > resets all time windows and state, despite having that saved in GCS, so it's > a crappy workaround. > > – > > flink-conf.yaml > {code:java} > parallelism.default: 1 > rest.address: analytics-job > jobmanager.rpc.address: analytics-job # = resource manager's address too > jobmanager.heap.size: 1024m > jobmanager.rpc.port: 6123 > jobmanager.slot.request.timeout: 3 > resourcemanager.rpc.port: 6123 > high-availability.jobmanager.port: 6123 > blob.server.port: 6124 > queryable-state.server.ports: 6125 > taskmanager.heap.size: 1024m > taskmanager.numberOfTaskSlots: 1 > web.log.path: /var/lib/log/flink/jobmanager.log > rest.port: 8081 > rest.bind-address: 0.0.0.0 > web.submit.enable: false > high-availability: zookeeper > high-availability.storageDir: gs://example_analytics/flink/zetcd/ > high-availability.zookeeper.quorum: analytics-zetcd:2181 > high-availability.zookeeper.path.root: /flink > high-availability.zookeeper.client.acl: open > state.backend:
[jira] [Commented] (FLINK-12533) Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type
[ https://issues.apache.org/jira/browse/FLINK-12533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841207#comment-16841207 ] sunjincheng commented on FLINK-12533: - +1 for this change! [~hequn8128] > Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type > -- > > Key: FLINK-12533 > URL: https://issues.apache.org/jira/browse/FLINK-12533 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Currently, there are four kinds of {{FunctionDefinition.Type}}, > {code:java} > public enum Type { > AGGREGATE_FUNCTION, > SCALAR_FUNCTION, > TABLE_FUNCTION, > OTHER_FUNCTION > } > {code} > The Type AGGREGATE_FUNCTION is used to express both AggregateFunction and > TableAggregateFunction. However, due to the two kinds of the function > contains different semantics. It would be nice if we can separate these two > kinds of functions more clearly by introducing another type > TABLE_AGGREGATE_FUNCTION. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12534) Reduce the test cost for Python API
[ https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Zhong reassigned FLINK-12534: - Assignee: Wei Zhong > Reduce the test cost for Python API > --- > > Key: FLINK-12534 > URL: https://issues.apache.org/jira/browse/FLINK-12534 > Project: Flink > Issue Type: Improvement > Components: API / Python, Travis >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Wei Zhong >Priority: Major > > Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop > 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 > is enough, and we can remove the test for Scala 2.12 and Hadoop 2.4.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12534) Reduce the test cost for Python API
sunjincheng created FLINK-12534: --- Summary: Reduce the test cost for Python API Key: FLINK-12534 URL: https://issues.apache.org/jira/browse/FLINK-12534 Project: Flink Issue Type: Improvement Components: API / Python, Travis Affects Versions: 1.9.0 Reporter: sunjincheng Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 is enough, and we can remove the test for Scala 2.12 and Hadoop 2.4.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12533) Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type
Hequn Cheng created FLINK-12533: --- Summary: Introduce TABLE_AGGREGATE_FUNCTION FunctionDefinition.Type Key: FLINK-12533 URL: https://issues.apache.org/jira/browse/FLINK-12533 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, there are four kinds of {{FunctionDefinition.Type}}, {code:java} public enum Type { AGGREGATE_FUNCTION, SCALAR_FUNCTION, TABLE_FUNCTION, OTHER_FUNCTION } {code} The Type AGGREGATE_FUNCTION is used to express both AggregateFunction and TableAggregateFunction. However, due to the two kinds of the function contains different semantics. It would be nice if we can separate these two kinds of functions more clearly by introducing another type TABLE_AGGREGATE_FUNCTION. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12532) Upgrade Avro to version 1.9.0
Ismaël Mejía created FLINK-12532: Summary: Upgrade Avro to version 1.9.0 Key: FLINK-12532 URL: https://issues.apache.org/jira/browse/FLINK-12532 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Ismaël Mejía Avro 1.9.0 was released with many nice features including reduced size (1MB less), and removed dependencies, no paranmer, no shaded guava, security updates, so probably a worth upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager
flinkbot edited a comment on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager URL: https://github.com/apache/flink/pull/8428#issuecomment-491750033 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @azagrebin * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @azagrebin * ❗ 3. Needs [attention] from. - Needs attention by @azagrebin * ✅ 4. The change fits into the overall [architecture]. - Approved by @azagrebin * ✅ 5. Overall code [quality] is good. - Approved by @azagrebin Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager
azagrebin commented on issue #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager URL: https://github.com/apache/flink/pull/8428#issuecomment-493004991 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager
azagrebin commented on a change in pull request #8428: [FLINK-12497][network] Refactor the start method of ConnectionManager URL: https://github.com/apache/flink/pull/8428#discussion_r284632408 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java ## @@ -35,24 +37,27 @@ private final PartitionRequestClientFactory partitionRequestClientFactory; - private final boolean isCreditBased; + private final NettyProtocol nettyProtocol; + + public NettyConnectionManager( + ResultPartitionProvider partitionProvider, + TaskEventPublisher taskEventPublisher, + NettyConfig nettyConfig, + boolean isCreditBased) { - public NettyConnectionManager(NettyConfig nettyConfig, boolean isCreditBased) { this.server = new NettyServer(nettyConfig); this.client = new NettyClient(nettyConfig); this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas()); this.partitionRequestClientFactory = new PartitionRequestClientFactory(client); - this.isCreditBased = isCreditBased; + this.nettyProtocol = new NettyProtocol(checkNotNull(partitionProvider), checkNotNull(taskEventPublisher), isCreditBased); Review comment: could be new line after `new NettyProtocol(` to avoid long lines This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8464: [hotfix] fix typos.
flinkbot commented on issue #8464: [hotfix] fix typos. URL: https://github.com/apache/flink/pull/8464#issuecomment-493004237 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] mr-cloud opened a new pull request #8464: [hotfix] fix typos.
mr-cloud opened a new pull request #8464: [hotfix] fix typos. URL: https://github.com/apache/flink/pull/8464 1. [flink-core] below -> above in TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in LocalStreamEnvironment methods ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10466) flink-yarn-tests should depend flink-dist
[ https://issues.apache.org/jira/browse/FLINK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10466. Resolution: Fixed Fix Version/s: 1.9.0 master: 29fdef8d237350df1cf915a62390f49e408ba960 > flink-yarn-tests should depend flink-dist > - > > Key: FLINK-10466 > URL: https://issues.apache.org/jira/browse/FLINK-10466 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > may be adding > {code:java} > > org.apache.flink > flink-dist_${scala.binary.version} > ${project.version} > test > pom > {code} > not really sure but it causes failure on my automate testing process, and by > adding this dependency the error disappear. Even I wonder how it works > currently on travis. > flink-yarn-test obviously depends on flink-dist since some tests try to find > flink uberjar. > Please take a look for this. cc [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284630476 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.TestRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.RELEASED; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link DefaultResultPartition}. + */ +public class DefaultResultPartitionTest { + + private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + + private ExecutionGraph executionGraph; + + private ExecutionGraphToSchedulingTopologyAdapter adapter; + + private List intermediateResultPartitions; + + private List schedulingResultPartitions; + + @Before + public void setUp() throws Exception { Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol merged pull request #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test
zentol merged pull request #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test URL: https://github.com/apache/flink/pull/8398 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284618858 ## File path: flink-python/pyflink/table/table_config.py ## @@ -38,24 +44,76 @@ def is_stream(self, is_stream): @property def parallelism(self): +""" +The parallelism for all operations. +""" return self._parallelism @parallelism.setter def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +""" +The timezone_id for a timezone, either an abbreviation such as "PST", a full name such as +"America/Los_Angeles", or a custom timezone_id such as "GMT-8:00". +""" +return self._j_table_config.getTimeZone().getID() + +@timezone.setter +def timezone(self, timezone_id): +if timezone_id is not None and isinstance(timezone_id, str): Review comment: should also consider unicode This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284615878 ## File path: flink-python/pyflink/table/table.py ## @@ -96,16 +105,413 @@ def where(self, predicate): """ Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE clause. + Example: :: - >>> tab.where("name = 'Fred'") :param predicate: Predicate expression string. :return: Result table. """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns only distinct (different) values. + +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284617719 ## File path: flink-python/pyflink/table/table_config.py ## @@ -38,24 +44,76 @@ def is_stream(self, is_stream): @property def parallelism(self): +""" +The parallelism for all operations. +""" return self._parallelism @parallelism.setter def parallelism(self, parallelism): self._parallelism = parallelism +@property +def timezone(self): +""" +The timezone_id for a timezone, either an abbreviation such as "PST", a full name such as Review comment: `The timezone_id for a timezone` -> `Returns the timezone id` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284612580 ## File path: flink-python/pyflink/table/query_config.py ## @@ -0,0 +1,112 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABCMeta +from datetime import timedelta +from py4j.compat import long + +from pyflink.java_gateway import get_gateway + + +class QueryConfig(object): +""" +The :class:`QueryConfig` holds parameters to configure the behavior of queries. +""" + +__metaclass__ = ABCMeta + +def __init__(self, j_query_config): +self._j_query_config = j_query_config + + +class StreamQueryConfig(QueryConfig): +""" +The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries. +""" + +def __init__(self, j_stream_query_config=None): +self._jvm = get_gateway().jvm +if j_stream_query_config is not None: +self._j_stream_query_config = j_stream_query_config +else: +self._j_stream_query_config = self._jvm.StreamQueryConfig() +super(StreamQueryConfig, self).__init__(self._j_stream_query_config) + +def with_idle_state_retention_time(self, min_time, max_time): +""" +Specifies a minimum and a maximum time interval for how long idle state, i.e., state which +was not updated, will be retained. + +State will never be cleared until it was idle for less than the minimum time and will never +be kept if it was idle for more than the maximum time. + +When new data arrives for previously cleaned-up state, the new data will be handled as if it +was the first data. This can result in previous results being overwritten. + +Set to ``datetime.timedelta()``(zero) to never clean-up the state. + +.. note:: +Cleaning up state requires additional bookkeeping which becomes less expensive for +larger differences of minTime and maxTime. The difference between minTime and maxTime +must be at least ``datetime.timedelta(minutes=5)``(5 minutes). + +:param min_time: The minimum time interval for which idle state is retained. Set to + ``datetime.timedelta()``(zero) to never clean-up the state. +:param max_time: The maximum time interval for which idle state is retained. Must be at + least 5 minutes greater than minTime. Set to + ``datetime.timedelta()``(zero) to never clean-up the state. +:return: :class:`StreamQueryConfig` +""" +# type: (timedelta, timedelta) -> StreamQueryConfig +j_time_class = self._jvm.org.apache.flink.api.common.time.Time +j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000))) +j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000))) +self._j_stream_query_config = \ +self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time) +return self + +def get_min_idle_state_retention_time(self): +""" +State might be cleared and removed if it was not updated for the defined period of time. + +:return: The minimum time until state which was not updated will be retained. +""" +# type: () -> int +return self._j_stream_query_config.getMinIdleStateRetentionTime() + +def get_max_idle_state_retention_time(self): +""" +State will be cleared and removed if it was not updated for the defined period of time. + +:return: The maximum time until state which was not updated will be retained. +""" +# type: () -> int +return self._j_stream_query_config.getMaxIdleStateRetentionTime() + + +class BatchQueryConfig(QueryConfig): +""" +The :class:`BatchQueryConfig` holds parameters to configure the behavior
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284615491 ## File path: flink-python/pyflink/table/table.py ## @@ -96,16 +105,413 @@ def where(self, predicate): """ Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE clause. + Example: :: - >>> tab.where("name = 'Fred'") :param predicate: Predicate expression string. :return: Result table. """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns only distinct (different) values. + +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284622626 ## File path: flink-python/pyflink/table/table_environment.py ## @@ -153,9 +240,51 @@ def __init__(self, j_tenv): self._j_tenv = j_tenv super(StreamTableEnvironment, self).__init__(j_tenv) +def get_config(self): +""" +Returns the table config to define the runtime behavior of the Table API. + +:return: Current :class:`TableConfig`. +""" +table_config = TableConfig() +table_config._j_table_config = self._j_tenv.getConfig() +table_config.is_stream = True Review comment: Currently there are two ways to set the table config properties: in TableConfig and TableConfig.Builder. What about removing all the set methods in TableConfig? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
dianfu commented on a change in pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#discussion_r284614157 ## File path: flink-python/pyflink/table/table.py ## @@ -96,16 +105,413 @@ def where(self, predicate): """ Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE clause. + Example: :: - >>> tab.where("name = 'Fred'") :param predicate: Predicate expression string. :return: Result table. """ return Table(self._j_table.where(predicate)) +def group_by(self, fields): +""" +Groups the elements on some grouping keys. Use this before a selection with aggregations +to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + +Example: +:: +>>> tab.group_by("key").select("key, value.avg") + +:param fields: Group keys. +:return: The grouped table. +""" +return GroupedTable(self._j_table.groupBy(fields)) + +def distinct(self): +""" +Removes duplicate values and returns only distinct (different) values. + +Example: +:: +>>> tab.select("key, value").distinct() + +:return: Result table. +""" +return Table(self._j_table.distinct()) + +def join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. You can use where and select clauses after a join to further specify the +behaviour of the join. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` . + +Example: +:: +>>> left.join(right).where("a = b && c > 3").select("a, b, d") +>>> left.join(right, "a = b") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is not None: +return Table(self._j_table.join(right._j_table, join_predicate)) +else: +return Table(self._j_table.join(right._j_table)) + +def left_outer_join(self, right, join_predicate=None): +""" +Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.left_outer_join(right).select("a, b, d") +>>> left.left_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: Optional, the join predicate expression string. +:return: Result table. +""" +if join_predicate is None: +return Table(self._j_table.leftOuterJoin(right._j_table)) +else: +return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate)) + +def right_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.right_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +""" +return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate)) + +def full_outer_join(self, right, join_predicate): +""" +Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined +operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if +necessary. + +.. note:: +Both tables must be bound to the same :class:`TableEnvironment` and its +:class:`TableConfig` must have null check enabled (default). + +Example: +:: +>>> left.full_outer_join(right, "a = b").select("a, b, d") + +:param right: Right table. +:param join_predicate: The join predicate expression string. +:return: Result table. +
[GitHub] [flink] flinkbot edited a comment on issue #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test
flinkbot edited a comment on issue #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test URL: https://github.com/apache/flink/pull/8398#issuecomment-491180514 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @azagrebin * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @azagrebin * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @azagrebin * ✅ 5. Overall code [quality] is good. - Approved by @azagrebin Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test
azagrebin commented on issue #8398: [FLINK-10466][build] Add flink-dist dependency to yarn-test URL: https://github.com/apache/flink/pull/8398#issuecomment-492999241 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12530) Move Task.inputGatesById to NetworkEnvironment
[ https://issues.apache.org/jira/browse/FLINK-12530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12530: Summary: Move Task.inputGatesById to NetworkEnvironment (was: Move Task#inputGatesById to NetworkEnvironment) > Move Task.inputGatesById to NetworkEnvironment > -- > > Key: FLINK-12530 > URL: https://issues.apache.org/jira/browse/FLINK-12530 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Task.inputGatesById indexes SingleInputGates by id. The end user of this > indexing is NetworkEnviroment for two cases: > - SingleInputGate triggers producer partition readiness check and then the > successful result of check is dispatched back to this SingleInputGate by id. > We can just add an additional argument to > TaskActions.triggerPartitionProducerStateCheck. The argument is an immediate > callback to that SingleInputGate. Then inputGatesById is not needed for > dispatching. > - TaskExecutor.updatePartitions uses inputGatesById to dispatch PartitionInfo > update to the right SingleInputGate. If inputGatesById is moved to > NetworkEnviroment, which should be a better place for gate management, and > add NetworkEnviroment.updatePartitionInfo then TaskExecutor.updatePartitions > could directly call NetworkEnviroment.updatePartitionInfo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua commented on issue #8461: [FLINK-12422] Remove IN_TESTS for make test code and production code consistent
yanghua commented on issue #8461: [FLINK-12422] Remove IN_TESTS for make test code and production code consistent URL: https://github.com/apache/flink/pull/8461#issuecomment-492994671 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
flinkbot edited a comment on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#issuecomment-492991609 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @zhijiangw * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
azagrebin commented on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#issuecomment-492992119 @flinkbot attention @zhijiangW This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
flinkbot commented on issue #8463: [FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment URL: https://github.com/apache/flink/pull/8463#issuecomment-492991609 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services