[jira] [Commented] (FLINK-4367) Offer separate API for watermark generation and timestamp extraction
[ https://issues.apache.org/jira/browse/FLINK-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436334#comment-15436334 ] ramkrishna.s.vasudevan commented on FLINK-4367: --- So it is better to remove the TimeStampASsigner interface from the two classes AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks ? So for 2.0 we will do this and for the lower branch versions we will mark it as @Deprecated and say that in future from 2.0 they will not be implementing TimeStampAssigner - [~rmetzger]? > Offer separate API for watermark generation and timestamp extraction > > > Key: FLINK-4367 > URL: https://issues.apache.org/jira/browse/FLINK-4367 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Affects Versions: 1.1.0 >Reporter: Robert Metzger > Fix For: 2.0.0 > > > Right now, the {{AssignerWithPunctuatedWatermarks}} and > {{AssignerWithPeriodicWatermarks}} interfaces also require implementing a > {{TimestampAssigner}}. > For cases where the source emits records with timestamps, its not necessary > to extract timestamps again from the records, we just want to generate > watermarks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436242#comment-15436242 ] ASF GitHub Bot commented on FLINK-4449: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176826 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.heartbeat; + +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.slf4j.Logger; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This utility class implements the basis of trigger heartbeat from one component to another component periodically, + * for example trigger heartbeat from the ResourceManager to TaskExecutor. + * + * @param The type of the gateway to connect to. + * @param The type of the successful heartbeat responses with payload. + */ +public abstract class HeartbeatScheduler { + /** default heartbeat interval time in millisecond */ + private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000; + + /** default heartbeat timeout in millisecond */ + private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200; + + /** default max heartbeat interval time in millisecond (which is used in retry heartbeat case) */ + private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 3; + + /** default heartbeat attempt delay after an exception has occurred */ + private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000; + + /** default max heartbeat retry time for one heartbeat */ + private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 6; + + private final long heartbeatInterval; + + private final long heartbeatTimeout; + + private final long maxHeartbeatTimeout; + + private final long delayOnError; + + private final long maxAttemptTime; + + /** target gateway to receive the heartbeat and give heartbeatResponse */ + protected final Gateway targetGateway; + + /** the target address */ + private final String targetAddress; + + /** the target gateway name */ + private final String targetName; + + private final RpcService rpcService; + + private final UUID leaderID; + + private final Logger log; + + private volatile boolean closed; + + /** +* @param rpcServicerpcService +* @param leaderID leader session id of current source end which send heartbeat +* @param targetGateway target gateway which receive heartbeat and response +* @param targetAddress target gateway address +* @param targetNametarget name +* @param log log +*/ + public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway targetGateway, + String targetAddress, String targetName, Logger log) { + this(rpcService, leaderID, targetGateway, targetAddress, targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS, + INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS, MAX_HEARTBEAT_ATTEMPT_MILLIS); + } + + /** +* @param rpcService rpcService +* @param leaderID
[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176826 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java --- @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.heartbeat; + +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.slf4j.Logger; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This utility class implements the basis of trigger heartbeat from one component to another component periodically, + * for example trigger heartbeat from the ResourceManager to TaskExecutor. + * + * @param The type of the gateway to connect to. + * @param The type of the successful heartbeat responses with payload. + */ +public abstract class HeartbeatScheduler { + /** default heartbeat interval time in millisecond */ + private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000; + + /** default heartbeat timeout in millisecond */ + private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200; + + /** default max heartbeat interval time in millisecond (which is used in retry heartbeat case) */ + private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 3; + + /** default heartbeat attempt delay after an exception has occurred */ + private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000; + + /** default max heartbeat retry time for one heartbeat */ + private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 6; + + private final long heartbeatInterval; + + private final long heartbeatTimeout; + + private final long maxHeartbeatTimeout; + + private final long delayOnError; + + private final long maxAttemptTime; + + /** target gateway to receive the heartbeat and give heartbeatResponse */ + protected final Gateway targetGateway; + + /** the target address */ + private final String targetAddress; + + /** the target gateway name */ + private final String targetName; + + private final RpcService rpcService; + + private final UUID leaderID; + + private final Logger log; + + private volatile boolean closed; + + /** +* @param rpcServicerpcService +* @param leaderID leader session id of current source end which send heartbeat +* @param targetGateway target gateway which receive heartbeat and response +* @param targetAddress target gateway address +* @param targetNametarget name +* @param log log +*/ + public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway targetGateway, + String targetAddress, String targetName, Logger log) { + this(rpcService, leaderID, targetGateway, targetAddress, targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS, + INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS, MAX_HEARTBEAT_ATTEMPT_MILLIS); + } + + /** +* @param rpcService rpcService +* @param leaderIDleader session id of current source end which send heartbeat +* @param targetGateway target gateway which receive heartbeat and response +* @param targetAddress target gateway address +* @param
[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436237#comment-15436237 ] ASF GitHub Bot commented on FLINK-4449: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176673 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class); + when(highAvailabilityServices.getResourceManagerLeaderElectionService()).thenReturn(leaderElectionService); + final ResourceManager resourceManager = new ResourceManager(akkaRpcService, highAvailabilityServices); + resourceManager.start(); + // before grant leadership, resourceManager's leaderId is null + Assert.assertNull(resourceManager.getLeaderSessionID()); + final UUID leaderId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderId); + // after grant leadership, resourceManager's leaderId has value + Assert.assertEquals(getLatestLeaderId(resourceManager), leaderId); --- End diff -- I don't think so. ResourceManager.getLeaderSessionID has to run in the main thread of resourceManager, so that it can get new LeaderSessionId after resourceManager is granted leadership or revoker leadership. > Heartbeat Manager between ResourceManager and TaskExecutor > -- > > Key: FLINK-4449 > URL: https://issues.apache.org/jira/browse/FLINK-4449 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > >
[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176673 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class); + when(highAvailabilityServices.getResourceManagerLeaderElectionService()).thenReturn(leaderElectionService); + final ResourceManager resourceManager = new ResourceManager(akkaRpcService, highAvailabilityServices); + resourceManager.start(); + // before grant leadership, resourceManager's leaderId is null + Assert.assertNull(resourceManager.getLeaderSessionID()); + final UUID leaderId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderId); + // after grant leadership, resourceManager's leaderId has value + Assert.assertEquals(getLatestLeaderId(resourceManager), leaderId); --- End diff -- I don't think so. ResourceManager.getLeaderSessionID has to run in the main thread of resourceManager, so that it can get new LeaderSessionId after resourceManager is granted leadership or revoker leadership. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436225#comment-15436225 ] ASF GitHub Bot commented on FLINK-4449: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176200 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class); --- End diff -- good > Heartbeat Manager between ResourceManager and TaskExecutor > -- > > Key: FLINK-4449 > URL: https://issues.apache.org/jira/browse/FLINK-4449 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > > HeartbeatManager is responsible for heartbeat between resourceManager to > TaskExecutor > 1. Register taskExecutors > register heartbeat targets. If the heartbeat response for these targets is > not reported in time, mark target failed and notify resourceManager > 2. trigger heartbeat > trigger heartbeat from resourceManager to TaskExecutor periodically > taskExecutor report slot allocation in the heartbeat response > ResourceManager sync self slot allocation with the heartbeat response -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176210 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.heartbeat; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * test for HeartbeatScheduler, including successful case, timeout case(retry on backoff timeout), failure case(retry) + */ +public class HeartbeatSchedulerTest extends TestLogger { + private static final long INITIAL_INTERVAL = 200; + private static final long INITIAL_TIMEOUT = 20; + private static final long MAX_TIMEOUT = 150; + private static final long DELAY_ON_ERROR = 100; + private static final long MAX_ATTEMPT_TIME = 300; + + + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); --- End diff -- Cool --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176200 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + HighAvailabilityServices highAvailabilityServices = mock(HighAvailabilityServices.class); --- End diff -- good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436226#comment-15436226 ] ASF GitHub Bot commented on FLINK-4449: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76176210 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.heartbeat; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * test for HeartbeatScheduler, including successful case, timeout case(retry on backoff timeout), failure case(retry) + */ +public class HeartbeatSchedulerTest extends TestLogger { + private static final long INITIAL_INTERVAL = 200; + private static final long INITIAL_TIMEOUT = 20; + private static final long MAX_TIMEOUT = 150; + private static final long DELAY_ON_ERROR = 100; + private static final long MAX_ATTEMPT_TIME = 300; + + + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); --- End diff -- Cool > Heartbeat Manager between ResourceManager and TaskExecutor > -- > > Key: FLINK-4449 > URL: https://issues.apache.org/jira/browse/FLINK-4449 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > > HeartbeatManager is responsible for heartbeat between resourceManager to > TaskExecutor > 1. Register taskExecutors > register heartbeat targets. If the heartbeat response for these targets is > not reported in time, mark target failed and notify resourceManager > 2. trigger heartbeat > trigger heartbeat from resourceManager to TaskExecutor periodically > taskExecutor report slot allocation in the heartbeat response > ResourceManager sync self slot allocation with the heartbeat response -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436212#comment-15436212 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- I can work on this and come up with a design doc to ensure that Iterative jobs are smart enough to ensure they don't go for requesting memory segments every time. [~StephanEwen]? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2416: FLINK-4480: Incorrect link to elastic.co in documentation
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2416 Hi @smarthi, Thank you for picking this up. The changes LGTM :) Also tested the 2.3.5 ES version bump. Good catches on the Guava and broken example also! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation
[ https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436175#comment-15436175 ] ASF GitHub Bot commented on FLINK-4480: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2416 Hi @smarthi, Thank you for picking this up. The changes LGTM :) Also tested the 2.3.5 ES version bump. Good catches on the Guava and broken example also! > Incorrect link to elastic.co in documentation > - > > Key: FLINK-4480 > URL: https://issues.apache.org/jira/browse/FLINK-4480 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0, 1.1.1 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Trivial > > The link URL of the entry "Elasticsearch 2x (sink)" on the connector's > documentation page > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html > is pointing to http://elastic.com but should point to http://elastic.co -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4339) Implement Slot Pool Core
[ https://issues.apache.org/jira/browse/FLINK-4339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-4339: - Assignee: Kurt Young > Implement Slot Pool Core > > > Key: FLINK-4339 > URL: https://issues.apache.org/jira/browse/FLINK-4339 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Kurt Young > Fix For: 1.2.0 > > > Impements the core slot structures and behavior of the {{SlotPool}}: > - pool of available slots > - request slots and response if slot is available in pool > - return / deallocate slots > Detail design in here: > https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4339) Implement Slot Pool Core
[ https://issues.apache.org/jira/browse/FLINK-4339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-4339: -- Description: Impements the core slot structures and behavior of the {{SlotPool}}: - pool of available slots - request slots and response if slot is available in pool - return / deallocate slots Detail design in here: https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/ was: Impements the core slot structures and behavior of the {{SlotPool}}: - pool of available slots - request slots and response if slot is available in pool - return / deallocate slots > Implement Slot Pool Core > > > Key: FLINK-4339 > URL: https://issues.apache.org/jira/browse/FLINK-4339 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > Fix For: 1.2.0 > > > Impements the core slot structures and behavior of the {{SlotPool}}: > - pool of available slots > - request slots and response if slot is available in pool > - return / deallocate slots > Detail design in here: > https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4478) Implement heartbeat logic
[ https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436167#comment-15436167 ] zhangjing commented on FLINK-4478: -- [~till.rohrmann]. I agree we should define how should it look like first. And I try to give my opinions for your question. Here's my thought, What's your advice? 1. exponential backoff strategy. In fact, it is not complete exponential backoff. like 'Math.min(2 * timeoutMillis, maxHeartbeatTimeout)', Maybe we could use maxHeartbeatTimeout to decrease the risk of wait twice as long as defined until notified about a heartbeat failure. Also we could use constant retry period instead of backoff strategy 2. whether every heartbeat connection should be responsible for triggering itself or whether the heartbeat manager should be responsible for that? Every heartbeat scheduler don't trigger itself, it depends on outer world(Here i means HeartbeatManager) call it's start method to trigger it. 3. Is the heartbeat receiving end an independent RpcEndpoint? How does the payload delivery works? Does the sender side asks for the result (future) or does the receiving side answers via a tell message to the heartbeat manager? On the sender side, receiving end is a gateway which can be got by its address. And Sender side ask receiver for the heartbeat payload. 4. How does receiving end monitor the sender so that if the heartbeat request is not delivered, then receiving end could mark sending end as dead? I think it could be independent of heartbeat manager on the sending side. It should run on the receiving end while heartbeat scheduler run on the sending side. > Implement heartbeat logic > - > > Key: FLINK-4478 > URL: https://issues.apache.org/jira/browse/FLINK-4478 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > Parent issue to track the development of the heartbeat logic (sender and > receiver) for the new Flip-6 refactoring. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436164#comment-15436164 ] ASF GitHub Bot commented on FLINK-4449: --- Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2410 Hi, till. Thanks for reviewing and good advices so much. I agree we should define how should it look like first. And I try to give my opinions for your question. 1. exponential backoff strategy. In fact, it is not complete exponential backoff. like 'Math.min(2 * timeoutMillis, maxHeartbeatTimeout)', Maybe we could use maxHeartbeatTimeout to decrease the risk of wait twice as long as defined until notified about a heartbeat failure. Also we could use constant retry period instead of backoff strategy 2. whether every heartbeat connection should be responsible for triggering itself or whether the heartbeat manager should be responsible for that? Every heartbeat scheduler don't trigger itself, it depends on outer world(Here i means HeartbeatManager) call it's start method to trigger it. 3. Is the heartbeat receiving end an independent RpcEndpoint? How does the payload delivery works? Does the sender side asks for the result (future) or does the receiving side answers via a tell message to the heartbeat manager? On the sender side, receiving end is a gateway which can be got by its address. And Sender side ask receiver for the heartbeat payload. 4. How does receiving end monitor the sender so that if the heartbeat request is not delivered, then receiving end could mark sending end as dead? I think it could be independent of heartbeat manager on the sending side. It should run on the receiving end while heartbeat scheduler run on the sending side. What's your advice? > Heartbeat Manager between ResourceManager and TaskExecutor > -- > > Key: FLINK-4449 > URL: https://issues.apache.org/jira/browse/FLINK-4449 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > > HeartbeatManager is responsible for heartbeat between resourceManager to > TaskExecutor > 1. Register taskExecutors > register heartbeat targets. If the heartbeat response for these targets is > not reported in time, mark target failed and notify resourceManager > 2. trigger heartbeat > trigger heartbeat from resourceManager to TaskExecutor periodically > taskExecutor report slot allocation in the heartbeat response > ResourceManager sync self slot allocation with the heartbeat response -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2410: [FLINK-4449] [cluster management] Heartbeat Manager betwe...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2410 Hi, till. Thanks for reviewing and good advices so much. I agree we should define how should it look like first. And I try to give my opinions for your question. 1. exponential backoff strategy. In fact, it is not complete exponential backoff. like 'Math.min(2 * timeoutMillis, maxHeartbeatTimeout)', Maybe we could use maxHeartbeatTimeout to decrease the risk of wait twice as long as defined until notified about a heartbeat failure. Also we could use constant retry period instead of backoff strategy 2. whether every heartbeat connection should be responsible for triggering itself or whether the heartbeat manager should be responsible for that? Every heartbeat scheduler don't trigger itself, it depends on outer world(Here i means HeartbeatManager) call it's start method to trigger it. 3. Is the heartbeat receiving end an independent RpcEndpoint? How does the payload delivery works? Does the sender side asks for the result (future) or does the receiving side answers via a tell message to the heartbeat manager? On the sender side, receiving end is a gateway which can be got by its address. And Sender side ask receiver for the heartbeat payload. 4. How does receiving end monitor the sender so that if the heartbeat request is not delivered, then receiving end could mark sending end as dead? I think it could be independent of heartbeat manager on the sending side. It should run on the receiving end while heartbeat scheduler run on the sending side. What's your advice? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
[ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436094#comment-15436094 ] ASF GitHub Bot commented on FLINK-4449: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76168206 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.heartbeat; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * test for HeartbeatScheduler, including successful case, timeout case(retry on backoff timeout), failure case(retry) + */ +public class HeartbeatSchedulerTest extends TestLogger { + private static final long INITIAL_INTERVAL = 200; + private static final long INITIAL_TIMEOUT = 20; + private static final long MAX_TIMEOUT = 150; + private static final long DELAY_ON_ERROR = 100; + private static final long MAX_ATTEMPT_TIME = 300; + + + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + + /** +* test for receiving a regular heartbeat response in time, and checking the heartbeatResponse is properly delivered +* @throws Exception +*/ + @Test + public void testHeartbeatSuccessful() throws Exception { + HeartbeatSender sender = mock(HeartbeatSender.class); + UUID leaderSessionID = UUID.randomUUID(); + + HeartbeatReceiverGateway targetGateway = mock(HeartbeatReceiverGateway.class); + + String response = "ok"; + when(targetGateway.triggerHeartbeat(any(UUID.class), any(FiniteDuration.class))).thenReturn( + Futures.successful(response) + ); + + TestingHeartbeatScheduler heartbeatScheduler = new TestingHeartbeatScheduler(sender, akkaRpcService, leaderSessionID, + targetGateway, "taskExecutor-test-address", "testTargetGateway", log, INITIAL_INTERVAL, INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, MAX_ATTEMPT_TIME); + heartbeatScheduler.start(); + + // verify heartbeat successful and syncHeartbeatResponse is invoked
[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2410#discussion_r76168206 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.heartbeat; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * test for HeartbeatScheduler, including successful case, timeout case(retry on backoff timeout), failure case(retry) + */ +public class HeartbeatSchedulerTest extends TestLogger { + private static final long INITIAL_INTERVAL = 200; + private static final long INITIAL_TIMEOUT = 20; + private static final long MAX_TIMEOUT = 150; + private static final long DELAY_ON_ERROR = 100; + private static final long MAX_ATTEMPT_TIME = 300; + + + private static ActorSystem actorSystem; + private static AkkaRpcService akkaRpcService; + + private static final Timeout timeout = new Timeout(1, TimeUnit.MILLISECONDS); + + @BeforeClass + public static void setup() throws Exception { + actorSystem = AkkaUtils.createDefaultActorSystem(); + + akkaRpcService = new AkkaRpcService(actorSystem, timeout); + } + + @AfterClass + public static void teardown() throws Exception { + akkaRpcService.stopService(); + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + + /** +* test for receiving a regular heartbeat response in time, and checking the heartbeatResponse is properly delivered +* @throws Exception +*/ + @Test + public void testHeartbeatSuccessful() throws Exception { + HeartbeatSender sender = mock(HeartbeatSender.class); + UUID leaderSessionID = UUID.randomUUID(); + + HeartbeatReceiverGateway targetGateway = mock(HeartbeatReceiverGateway.class); + + String response = "ok"; + when(targetGateway.triggerHeartbeat(any(UUID.class), any(FiniteDuration.class))).thenReturn( + Futures.successful(response) + ); + + TestingHeartbeatScheduler heartbeatScheduler = new TestingHeartbeatScheduler(sender, akkaRpcService, leaderSessionID, + targetGateway, "taskExecutor-test-address", "testTargetGateway", log, INITIAL_INTERVAL, INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, MAX_ATTEMPT_TIME); + heartbeatScheduler.start(); + + // verify heartbeat successful and syncHeartbeatResponse is invoked for sync heartbeat response + verify(sender, timeout(5000)).syncHeartbeatResponse(eq(response)); + // verify heartbeat trigger is still on + Assert.assertFalse(heartbeatScheduler.isClosed()); +
[jira] [Commented] (FLINK-4483) Complete test coverage integration
[ https://issues.apache.org/jira/browse/FLINK-4483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435930#comment-15435930 ] Pavel Fadeev commented on FLINK-4483: - Team, can you please help with this ticket assignment to my account? > Complete test coverage integration > -- > > Key: FLINK-4483 > URL: https://issues.apache.org/jira/browse/FLINK-4483 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Pavel Fadeev > > With INFRA-12458 implemented now we need to move on and: > - recover code coverage reporting for maven build > - report it continuously with coveralls.io . Obsolete reports could be found > [here|https://coveralls.io/github/apache/flink] > It is good to: > - prevent extra tests run for reporting > - minimize build timing impact with agent/instrumentation/reporting > - provide java/scala code coverage metrics there -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4483) Complete test coverage integration
Pavel Fadeev created FLINK-4483: --- Summary: Complete test coverage integration Key: FLINK-4483 URL: https://issues.apache.org/jira/browse/FLINK-4483 Project: Flink Issue Type: Improvement Components: Build System Reporter: Pavel Fadeev With INFRA-12458 implemented now we need to move on and: - recover code coverage reporting for maven build - report it continuously with coveralls.io . Obsolete reports could be found [here|https://coveralls.io/github/apache/flink] It is good to: - prevent extra tests run for reporting - minimize build timing impact with agent/instrumentation/reporting - provide java/scala code coverage metrics there -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock
Ted Yu created FLINK-4482: - Summary: numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock Key: FLINK-4482 URL: https://issues.apache.org/jira/browse/FLINK-4482 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor In CheckpointCoordinator#stopCheckpointScheduler() : {code} synchronized (lock) { ... numUnsuccessfulCheckpointsTriggers = 0; {code} triggerLock is not held in the above operation. See comment for triggerLock earlier in triggerCheckpoint(): {code} // we lock with a special lock to make sure that trigger requests do not overtake each other. // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' // may issue blocking operations. Using a different lock than teh coordinator-wide lock, // we avoid blocking the processing of 'acknowledge/decline' messages during that time. synchronized (triggerLock) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2383: [FLINK-4418] [client] Improve resilience when InetAddress...
Github user rehevkor5 commented on the issue: https://github.com/apache/flink/pull/2383 @StephanEwen do you want me to take a look & see what straightforward refactoring I can make, and squeeze that into this PR? I can't tell if that was a hint or not :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435824#comment-15435824 ] ASF GitHub Bot commented on FLINK-4418: --- Github user rehevkor5 commented on the issue: https://github.com/apache/flink/pull/2383 @StephanEwen do you want me to take a look & see what straightforward refactoring I can make, and squeeze that into this PR? I can't tell if that was a hint or not :) > ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if > InetAddress.getLocalHost throws exception > -- > > Key: FLINK-4418 > URL: https://issues.apache.org/jira/browse/FLINK-4418 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.0 >Reporter: Shannon Carey >Assignee: Shannon Carey > > When attempting to connect to a cluster with a ClusterClient, if the > machine's hostname is not resolvable to an IP, an exception is thrown > preventing success. > This is the case if, for example, the hostname is not present & mapped to a > local IP in /etc/hosts. > The exception is below. I suggest that findAddressUsingStrategy() should > catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and > return null, allowing alternative strategies to be attempted by > findConnectingAddress(). I will open a PR to this effect. Ideally this could > be included in both 1.2 and 1.1.2. > In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS > EC2 instance. > {code} > 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed > to retrieve the JobManager gateway. > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) > 21:11:35 at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > 21:11:35 at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) > 21:11:35 at > com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) > 21:11:35 at > com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) > 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager > address at /10.2.89.80:43126 > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) > 21:11:35 ... 8 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: > ip-10-2-64-47: unknown error > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1505) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) > 21:11:35 ... 10 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown > error > 21:11:35 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > 21:11:35 at > java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > 21:11:35 at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1500) > 21:11:35 ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435814#comment-15435814 ] ASF GitHub Bot commented on FLINK-3677: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2109 @mxm Thank you for the review. I've updated PR accordingly. > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435813#comment-15435813 ] ASF GitHub Bot commented on FLINK-3677: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r76148927 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class GlobFilePathFilterTest { + @Test + public void defaultConstructorCreateMatchAllFilter() { + GlobFilePathFilter matcher = new GlobFilePathFilter(); + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void matchAllFilesByDefault() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.emptyList(), + Collections.emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void excludeFilesNotInIncludePatterns() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/*"), + Collections.emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + assertTrue(matcher.filterPath(new Path("dir1/file.txt"))); + } + + @Test + public void excludeFilesIfMatchesExclude() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/*"), --- End diff -- Yes, this is supported. Added a test for this. > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2109 @mxm Thank you for the review. I've updated PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r76148927 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class GlobFilePathFilterTest { + @Test + public void defaultConstructorCreateMatchAllFilter() { + GlobFilePathFilter matcher = new GlobFilePathFilter(); + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void matchAllFilesByDefault() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.emptyList(), + Collections.emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void excludeFilesNotInIncludePatterns() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/*"), + Collections.emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + assertTrue(matcher.filterPath(new Path("dir1/file.txt"))); + } + + @Test + public void excludeFilesIfMatchesExclude() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/*"), --- End diff -- Yes, this is supported. Added a test for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435810#comment-15435810 ] ASF GitHub Bot commented on FLINK-3677: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r76148802 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java --- @@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() { } @Test + public void testExcludeFiles() { + try { + final String contents = "CONTENTS"; + + // create some accepted, some ignored files + + File child1 = temporaryFolder.newFile("dataFile1.txt"); + File child2 = temporaryFolder.newFile("another_file.bin"); + + File[] files = { child1, child2 }; + + createTempFiles(contents.getBytes(), files); + + // test that only the valid files are accepted + + Configuration configuration = new Configuration(); + + final DummyFileInputFormat format = new DummyFileInputFormat(); + format.setFilePath(temporaryFolder.getRoot().toURI().toString()); + format.configure(configuration); + format.setFilesFilter(new GlobFilePathFilter( + Collections.singletonList("**"), --- End diff -- * - means any file ** - means any file in any subdirectory Added a comment and test for that. > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r76148802 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java --- @@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() { } @Test + public void testExcludeFiles() { + try { + final String contents = "CONTENTS"; + + // create some accepted, some ignored files + + File child1 = temporaryFolder.newFile("dataFile1.txt"); + File child2 = temporaryFolder.newFile("another_file.bin"); + + File[] files = { child1, child2 }; + + createTempFiles(contents.getBytes(), files); + + // test that only the valid files are accepted + + Configuration configuration = new Configuration(); + + final DummyFileInputFormat format = new DummyFileInputFormat(); + format.setFilePath(temporaryFolder.getRoot().toURI().toString()); + format.configure(configuration); + format.setFilesFilter(new GlobFilePathFilter( + Collections.singletonList("**"), --- End diff -- * - means any file ** - means any file in any subdirectory Added a comment and test for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2416: FLINK-4480: Incorrect link to elastic.co in documentation
Github user smarthi commented on the issue: https://github.com/apache/flink/pull/2416 1. Changed the ES2 version to 2.3.5 2. Fixed broken ElasticsearchExample.java 3. Removed use of Guava API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435807#comment-15435807 ] ASF GitHub Bot commented on FLINK-3677: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r76148725 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.flink.core.fs.Path; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class DefaultFilterTest { + @Parameters + public static Collection
[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation
[ https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435808#comment-15435808 ] ASF GitHub Bot commented on FLINK-4480: --- Github user smarthi commented on the issue: https://github.com/apache/flink/pull/2416 1. Changed the ES2 version to 2.3.5 2. Fixed broken ElasticsearchExample.java 3. Removed use of Guava API. > Incorrect link to elastic.co in documentation > - > > Key: FLINK-4480 > URL: https://issues.apache.org/jira/browse/FLINK-4480 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0, 1.1.1 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Trivial > > The link URL of the entry "Elasticsearch 2x (sink)" on the connector's > documentation page > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html > is pointing to http://elastic.com but should point to http://elastic.co -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation
[ https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435804#comment-15435804 ] ASF GitHub Bot commented on FLINK-4480: --- GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/2416 FLINK-4480: Incorrect link to elastic.co in documentation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink FLINK-4480 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2416 commit ea53aa1e715a8d862282cfbaff2243431455c0ae Author: smarthiDate: 2016-08-24T22:07:23Z FLINK-4480: Incorrect link to elastic.co in documentation > Incorrect link to elastic.co in documentation > - > > Key: FLINK-4480 > URL: https://issues.apache.org/jira/browse/FLINK-4480 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0, 1.1.1 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Trivial > > The link URL of the entry "Elasticsearch 2x (sink)" on the connector's > documentation page > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html > is pointing to http://elastic.com but should point to http://elastic.co -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r76148725 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.flink.core.fs.Path; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class DefaultFilterTest { + @Parameters + public static Collection
[GitHub] flink pull request #2416: FLINK-4480: Incorrect link to elastic.co in docume...
GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/2416 FLINK-4480: Incorrect link to elastic.co in documentation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink FLINK-4480 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2416 commit ea53aa1e715a8d862282cfbaff2243431455c0ae Author: smarthiDate: 2016-08-24T22:07:23Z FLINK-4480: Incorrect link to elastic.co in documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4453) Scala code example in Window documentation shows Java
[ https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4453. --- > Scala code example in Window documentation shows Java > - > > Key: FLINK-4453 > URL: https://issues.apache.org/jira/browse/FLINK-4453 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu >Priority: Trivial > Fix For: 1.2.0 > > > The first code example in the section "WindowFunction - The Generic Case" of > the window documentation of the 1.2 SNAPSHOT > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case) > shows Java code in the Scala tab. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4453) Scala code example in Window documentation shows Java
[ https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4453. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed via 42f65e4b93ef7f71b6252bc9c664bee727fd4278 > Scala code example in Window documentation shows Java > - > > Key: FLINK-4453 > URL: https://issues.apache.org/jira/browse/FLINK-4453 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu >Priority: Trivial > Fix For: 1.2.0 > > > The first code example in the section "WindowFunction - The Generic Case" of > the window documentation of the 1.2 SNAPSHOT > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case) > shows Java code in the Scala tab. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4437. --- > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4437. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed via 4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435730#comment-15435730 ] ASF GitHub Bot commented on FLINK-4437: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2409 @tedyu No problem, happens to everyone. I actually took your code and extended it a bit. Merged in 4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2409 @tedyu No problem, happens to everyone. I actually took your code and extended it a bit. Merged in 4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp
[ https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4417. --- > Checkpoints should be subsumed by CheckpointID not, by timestamp > > > Key: FLINK-4417 > URL: https://issues.apache.org/jira/browse/FLINK-4417 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > Fix For: 1.2.0 > > > Since the system clocks cannot be expected to be stable/monotonous, the > subsumption logic in the checkpoint coordinator should not decide which > checkpoint is "newer" based on the system time. > The checkpoint ID is guaranteed to be strictly monotonously increasing. It is > a better measure to decide which checkpoint is newer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp
[ https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4417. - Resolution: Fixed Fixed via 4e9d1775b5514c87981c78d55323cc2b17361867 Thank you for the contribution! > Checkpoints should be subsumed by CheckpointID not, by timestamp > > > Key: FLINK-4417 > URL: https://issues.apache.org/jira/browse/FLINK-4417 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > Fix For: 1.2.0 > > > Since the system clocks cannot be expected to be stable/monotonous, the > subsumption logic in the checkpoint coordinator should not decide which > checkpoint is "newer" based on the system time. > The checkpoint ID is guaranteed to be strictly monotonously increasing. It is > a better measure to decide which checkpoint is newer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4480) Incorrect link to elastic.co in documentation
[ https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Suneel Marthi reassigned FLINK-4480: Assignee: Suneel Marthi > Incorrect link to elastic.co in documentation > - > > Key: FLINK-4480 > URL: https://issues.apache.org/jira/browse/FLINK-4480 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0, 1.1.1 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Trivial > > The link URL of the entry "Elasticsearch 2x (sink)" on the connector's > documentation page > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html > is pointing to http://elastic.com but should point to http://elastic.co -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4457) Make the ExecutionGraph independent of Akka
[ https://issues.apache.org/jira/browse/FLINK-4457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4457. --- > Make the ExecutionGraph independent of Akka > --- > > Key: FLINK-4457 > URL: https://issues.apache.org/jira/browse/FLINK-4457 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Currently, the {{ExecutionGraph}} strongly depends on Akka as it requires an > {{ActorSystem}} to create the {{CheckpointCoordinatorDeActivator}}. > Furthermore, it allows {{ActorGateways}} to register for job status and > execution updates. > In order to improve modularization and abstraction I propose to introduce > proper listener interfaces. This would also allow to get rid of the > {{CheckpointCoordinatorDeActivator}} by simply implementing this interface. > Furthermore it will pave the way for the upcoming Flip-6 refactoring as it > offers a better abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2411: [FLINK-4453] [docs] Scala code example in Window d...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2411 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp
[ https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435701#comment-15435701 ] ASF GitHub Bot commented on FLINK-4417: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2407 > Checkpoints should be subsumed by CheckpointID not, by timestamp > > > Key: FLINK-4417 > URL: https://issues.apache.org/jira/browse/FLINK-4417 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > Fix For: 1.2.0 > > > Since the system clocks cannot be expected to be stable/monotonous, the > subsumption logic in the checkpoint coordinator should not decide which > checkpoint is "newer" based on the system time. > The checkpoint ID is guaranteed to be strictly monotonously increasing. It is > a better measure to decide which checkpoint is newer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4457) Make the ExecutionGraph independent of Akka
[ https://issues.apache.org/jira/browse/FLINK-4457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4457. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed via 635c869326cc77e4199e4d8ee597aed69ed16cd2 > Make the ExecutionGraph independent of Akka > --- > > Key: FLINK-4457 > URL: https://issues.apache.org/jira/browse/FLINK-4457 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Currently, the {{ExecutionGraph}} strongly depends on Akka as it requires an > {{ActorSystem}} to create the {{CheckpointCoordinatorDeActivator}}. > Furthermore, it allows {{ActorGateways}} to register for job status and > execution updates. > In order to improve modularization and abstraction I propose to introduce > proper listener interfaces. This would also allow to get rid of the > {{CheckpointCoordinatorDeActivator}} by simply implementing this interface. > Furthermore it will pave the way for the upcoming Flip-6 refactoring as it > offers a better abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4453) Scala code example in Window documentation shows Java
[ https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435702#comment-15435702 ] ASF GitHub Bot commented on FLINK-4453: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2411 > Scala code example in Window documentation shows Java > - > > Key: FLINK-4453 > URL: https://issues.apache.org/jira/browse/FLINK-4453 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu >Priority: Trivial > > The first code example in the section "WindowFunction - The Generic Case" of > the window documentation of the 1.2 SNAPSHOT > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case) > shows Java code in the Scala tab. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2407: FLINK-4417 Checkpoints should be subsumed by Check...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2407 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API
[ https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435668#comment-15435668 ] Fabian Hueske commented on FLINK-3850: -- Sorry for the very long delay [~ram_krish]. I was gone for a while. You can certainly work on this. I would recommend to first read the documentation about semantic annotations. They can significantly improve the performance if used correctly, but if they are incorrectly applied, the result might be invalid. So one has to be careful ;-) > Add forward field annotations to DataSet operators generated by the Table API > - > > Key: FLINK-3850 > URL: https://issues.apache.org/jira/browse/FLINK-3850 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske > > The DataSet API features semantic annotations [1] to hint the optimizer which > input fields an operator copies. This information is valuable for the > optimizer because it can infer that certain physical properties such as > partitioning or sorting are not destroyed by user functions and thus generate > more efficient execution plans. > The Table API is built on top of the DataSet API and generates DataSet > programs and code for user-defined functions. Hence, it knows exactly which > fields are modified and which not. We should use this information to > automatically generate forward field annotations and attach them to the > operators. This can help to significantly improve the performance of certain > jobs. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435630#comment-15435630 ] ASF GitHub Bot commented on FLINK-3874: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 Thank you for your review @fhueske. I've updated it. Could you please take a look at it? > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 Thank you for your review @fhueske. I've updated it. Could you please take a look at it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4481) Maximum results for pairwise algorithms
Greg Hogan created FLINK-4481: - Summary: Maximum results for pairwise algorithms Key: FLINK-4481 URL: https://issues.apache.org/jira/browse/FLINK-4481 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.2.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Return the per-vertex maximum scores for algorithms ({{AdamicAdar}}, {{JaccardIndex}}) which return pairwise results. The number of pairwise scores can be >> O(edges) but the number of maximum scores is O(vertices). It can also be most useful to know what vertices a vertex is most similar to. This implementation is very efficient through use of the hash-combine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction
[ https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435522#comment-15435522 ] ASF GitHub Bot commented on FLINK-3899: --- Github user danielblazevski commented on the issue: https://github.com/apache/flink/pull/2368 Pushed the Scala example > Document window processing with Reduce/FoldFunction + WindowFunction > > > Key: FLINK-3899 > URL: https://issues.apache.org/jira/browse/FLINK-3899 > Project: Flink > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.1.0 >Reporter: Fabian Hueske > > The streaming documentation does not describe how windows can be processed > with FoldFunction or ReduceFunction and a subsequent WindowFunction. This > combination allows for eager window aggregation (only a single element is > kept in the window) and access of the Window object, e.g., to have access to > the window's start and end time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...
Github user danielblazevski commented on the issue: https://github.com/apache/flink/pull/2368 Pushed the Scala example --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction
[ https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435489#comment-15435489 ] ASF GitHub Bot commented on FLINK-3899: --- Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76120063 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). {% highlight java %} -DataStream> input = ...; +DataStream input = ...; // for folding incremental computation input .keyBy() .window() -.apply(, new MyFoldFunction(), new MyWindowFunction()); +.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction { + +public Long fold(Long acc, SensorReading s) { +return Math.max(acc, s.timestamp()); +} +} + +private static class MyWindowFunction implements WindowFunction { + +public void apply(String key, TimeWindow window, Iterable timestamps, Collector out) { +out.collect(timestamps.iterator().next()); --- End diff -- Ah, lol, `Int` vs `Integer`... > Document window processing with Reduce/FoldFunction + WindowFunction > > > Key: FLINK-3899 > URL: https://issues.apache.org/jira/browse/FLINK-3899 > Project: Flink > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.1.0 >Reporter: Fabian Hueske > > The streaming documentation does not describe how windows can be processed > with FoldFunction or ReduceFunction and a subsequent WindowFunction. This > combination allows for eager window aggregation (only a single element is > kept in the window) and access of the Window object, e.g., to have access to > the window's start and end time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...
Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76120063 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). {% highlight java %} -DataStream> input = ...; +DataStream input = ...; // for folding incremental computation input .keyBy() .window() -.apply(, new MyFoldFunction(), new MyWindowFunction()); +.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction { + +public Long fold(Long acc, SensorReading s) { +return Math.max(acc, s.timestamp()); +} +} + +private static class MyWindowFunction implements WindowFunction { + +public void apply(String key, TimeWindow window, Iterable timestamps, Collector out) { +out.collect(timestamps.iterator().next()); --- End diff -- Ah, lol, `Int` vs `Integer`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction
[ https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435452#comment-15435452 ] ASF GitHub Bot commented on FLINK-3899: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76115561 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). {% highlight java %} -DataStream> input = ...; +DataStream input = ...; // for folding incremental computation input .keyBy() .window() -.apply(, new MyFoldFunction(), new MyWindowFunction()); +.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction { + +public Long fold(Long acc, SensorReading s) { +return Math.max(acc, s.timestamp()); +} +} + +private static class MyWindowFunction implements WindowFunction { + +public void apply(String key, TimeWindow window, Iterable timestamps, Collector out) { +out.collect(timestamps.iterator().next()); --- End diff -- Thanks for the update. The following Scala code does not show an error in my IntelliJ: ``` val readings: DataStream[SensorReading] = ??? val result: DataStream[(String, Long, Int)] = readings .keyBy(_.sensorId) .timeWindow(Time.minutes(1), Time.seconds(10)) .apply( ("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, (k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)], out: Collector[(String, Long, Int)]) => { val cnt = cnts.iterator.next() out.collect((k, w.getEnd, cnt._3)) } ) ``` Thanks, Fabian > Document window processing with Reduce/FoldFunction + WindowFunction > > > Key: FLINK-3899 > URL: https://issues.apache.org/jira/browse/FLINK-3899 > Project: Flink > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.1.0 >Reporter: Fabian Hueske > > The streaming documentation does not describe how windows can be processed > with FoldFunction or ReduceFunction and a subsequent WindowFunction. This > combination allows for eager window aggregation (only a single element is > kept in the window) and access of the Window object, e.g., to have access to > the window's start and end time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76115561 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). {% highlight java %} -DataStream> input = ...; +DataStream input = ...; // for folding incremental computation input .keyBy() .window() -.apply(, new MyFoldFunction(), new MyWindowFunction()); +.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction { + +public Long fold(Long acc, SensorReading s) { +return Math.max(acc, s.timestamp()); +} +} + +private static class MyWindowFunction implements WindowFunction { + +public void apply(String key, TimeWindow window, Iterable timestamps, Collector out) { +out.collect(timestamps.iterator().next()); --- End diff -- Thanks for the update. The following Scala code does not show an error in my IntelliJ: ``` val readings: DataStream[SensorReading] = ??? val result: DataStream[(String, Long, Int)] = readings .keyBy(_.sensorId) .timeWindow(Time.minutes(1), Time.seconds(10)) .apply( ("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, (k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)], out: Collector[(String, Long, Int)]) => { val cnt = cnts.iterator.next() out.collect((k, w.getEnd, cnt._3)) } ) ``` Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-3999: --- Assignee: Neelesh Srinivas Salian > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435446#comment-15435446 ] Gabor Gevay commented on FLINK-3999: Yes, thank you for working on it! > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4407) Implement the trigger DSL
[ https://issues.apache.org/jira/browse/FLINK-4407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435357#comment-15435357 ] ASF GitHub Bot commented on FLINK-4407: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2415 [FLINK-4407] Implement the trigger DSL This PR implements the Trigger DSL as presented in [FLIP-9](https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL) with the addition of the Repeat.Once and Repeat.Forever. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink window_dsl Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2415 > Implement the trigger DSL > - > > Key: FLINK-4407 > URL: https://issues.apache.org/jira/browse/FLINK-4407 > Project: Flink > Issue Type: Sub-task >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > This issue refers to the implementation of the trigger DSL. > The specification of the DSL has an open FLIP here: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL > And is currently under discussion in the dev@ mailing list here: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-9-Trigger-DSL-td13065.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2415: [FLINK-4407] Implement the trigger DSL
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2415 [FLINK-4407] Implement the trigger DSL This PR implements the Trigger DSL as presented in [FLIP-9](https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL) with the addition of the Repeat.Once and Repeat.Forever. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink window_dsl Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2415 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 > It seems like the privileged port issues can be circumvented by setting conf.getBoolean("dfs.datanode.require.secure.ports", false)? It is not supported yet https://github.com/apache/hadoop/blob/branch-2.3.0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L106;>ref. The trunk https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L115;> code also have similar logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435356#comment-15435356 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 > It seems like the privileged port issues can be circumvented by setting conf.getBoolean("dfs.datanode.require.secure.ports", false)? It is not supported yet https://github.com/apache/hadoop/blob/branch-2.3.0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L106;>ref. The trunk https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L115;> code also have similar logic. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction
[ https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435353#comment-15435353 ] ASF GitHub Bot commented on FLINK-3899: --- Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76102678 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). {% highlight java %} -DataStream> input = ...; +DataStream input = ...; // for folding incremental computation input .keyBy() .window() -.apply(, new MyFoldFunction(), new MyWindowFunction()); +.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction { + +public Long fold(Long acc, SensorReading s) { +return Math.max(acc, s.timestamp()); +} +} + +private static class MyWindowFunction implements WindowFunction { + +public void apply(String key, TimeWindow window, Iterable timestamps, Collector out) { +out.collect(timestamps.iterator().next()); --- End diff -- Made the changes in the Java version and added the comments. Had some issues with the Scala version. See screenshots, the only change is really to change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was saying has to have type `SensorReadng`, which is not ideal. I removed the Scala version for now. https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png;> https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png;> > Document window processing with Reduce/FoldFunction + WindowFunction > > > Key: FLINK-3899 > URL: https://issues.apache.org/jira/browse/FLINK-3899 > Project: Flink > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.1.0 >Reporter: Fabian Hueske > > The streaming documentation does not describe how windows can be processed > with FoldFunction or ReduceFunction and a subsequent WindowFunction. This > combination allows for eager window aggregation (only a single element is > kept in the window) and access of the Window object, e.g., to have access to > the window's start and end time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...
Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76102678 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). {% highlight java %} -DataStream> input = ...; +DataStream input = ...; // for folding incremental computation input .keyBy() .window() -.apply(, new MyFoldFunction(), new MyWindowFunction()); +.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction { + +public Long fold(Long acc, SensorReading s) { +return Math.max(acc, s.timestamp()); +} +} + +private static class MyWindowFunction implements WindowFunction { + +public void apply(String key, TimeWindow window, Iterable timestamps, Collector out) { +out.collect(timestamps.iterator().next()); --- End diff -- Made the changes in the Java version and added the comments. Had some issues with the Scala version. See screenshots, the only change is really to change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was saying has to have type `SensorReadng`, which is not ideal. I removed the Scala version for now. https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png;> https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png;> --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2414: [FLINK-4341] Let idle consumer subtasks emit max v...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2414 [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding This is a short-term fix, until the min-watermark service for the JobManager described in the JIRA discussion is available. The way this fix works is that we let idle subtasks that initially don't get assigned shards emit a `Long.MAX_VALUE` watermark. Also, we _only fail hard if an idle subtask_ is assigned new shards when resharding happens, to avoid messing up the watermarks. So, if all subtasks are not initially idle on startup (i.e., when total number of shards > consumer parallelism), the Kinesis consumer can still transparently handle resharding like before without failing. I've tested exactly-once with our manual tests (with and w/o resharding) and the fix works nicely, still retaining exactly-once guarantee despite non-transparency. However, I'm a bit unsure on how to test if the unbounded state with window operators is also fixed with this change, so we're still yet to clarify this. R: @rmetzger and @aljoscha for review. Thanks in advance! You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2414 commit bc8e50d99be745300f7418c58e9d30abc5469ba3 Author: Gordon TaiDate: 2016-08-24T08:38:06Z [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435319#comment-15435319 ] ASF GitHub Bot commented on FLINK-4437: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 I ran test suite which patch which failed here: ``` Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 201.106 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase The JobManager should handle gracefully failing task manager with slot sharing(org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase) Time elapsed: 200.43 sec <<< ERROR! java.util.concurrent.TimeoutException: Futures timed out after [20 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.ready(package.scala:86) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:455) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:439) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:330) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269) at org.apache.flink.runtime.testingUtils.TestingUtils$.startTestingCluster(TestingUtils.scala:86) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TaskManagerFailsWithSlotSharingITCase.scala:73) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase.withFixture(TaskManagerFailsWithSlotSharingITCase.scala:38) ``` Doesn't seem to be related to patch. > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 I ran test suite which patch which failed here: ``` Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 201.106 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase The JobManager should handle gracefully failing task manager with slot sharing(org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase) Time elapsed: 200.43 sec <<< ERROR! java.util.concurrent.TimeoutException: Futures timed out after [20 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.ready(package.scala:86) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:455) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:439) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:330) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269) at org.apache.flink.runtime.testingUtils.TestingUtils$.startTestingCluster(TestingUtils.scala:86) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TaskManagerFailsWithSlotSharingITCase.scala:73) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase.withFixture(TaskManagerFailsWithSlotSharingITCase.scala:38) ``` Doesn't seem to be related to patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly
[ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435309#comment-15435309 ] ASF GitHub Bot commented on FLINK-4341: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2414 [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding This is a short-term fix, until the min-watermark service for the JobManager described in the JIRA discussion is available. The way this fix works is that we let idle subtasks that initially don't get assigned shards emit a `Long.MAX_VALUE` watermark. Also, we _only fail hard if an idle subtask_ is assigned new shards when resharding happens, to avoid messing up the watermarks. So, if all subtasks are not initially idle on startup (i.e., when total number of shards > consumer parallelism), the Kinesis consumer can still transparently handle resharding like before without failing. I've tested exactly-once with our manual tests (with and w/o resharding) and the fix works nicely, still retaining exactly-once guarantee despite non-transparency. However, I'm a bit unsure on how to test if the unbounded state with window operators is also fixed with this change, so we're still yet to clarify this. R: @rmetzger and @aljoscha for review. Thanks in advance! You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2414 commit bc8e50d99be745300f7418c58e9d30abc5469ba3 Author: Gordon TaiDate: 2016-08-24T08:38:06Z [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager. > Kinesis connector does not emit maximum watermark properly > -- > > Key: FLINK-4341 > URL: https://issues.apache.org/jira/browse/FLINK-4341 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.1.1 >Reporter: Scott Kidder >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.2.0, 1.1.2 > > > **Prevously reported as "Checkpoint state size grows unbounded when task > parallelism not uniform"** > This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I > was previously using a 1.1.0 snapshot (commit 18995c8) which performed as > expected. This issue was introduced somewhere between those commits. > I've got a Flink application that uses the Kinesis Stream Consumer to read > from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots > each, providing a total of 4 slots. When running the application with a > parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) > and 4 slots for subsequent tasks that process the Kinesis stream data. I use > an in-memory store for checkpoint data. > Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint > states were growing unbounded when running with a parallelism of 4, > checkpoint interval of 10 seconds: > {code} > ID State Size > 1 11.3 MB > 220.9 MB > 3 30.6 MB > 4 41.4 MB > 5 52.6 MB > 6 62.5 MB > 7 71.5 MB > 8 83.3 MB > 9 93.5 MB > {code} > The first 4 checkpoints generally succeed, but then fail with an exception > like the following: > {code} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the maximum >
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435284#comment-15435284 ] ASF GitHub Bot commented on FLINK-4035: --- Github user eliaslevy commented on the issue: https://github.com/apache/flink/pull/2369 @rmetzger that's the one. NP. I realize breaking it up makes things easier. I just thought I'd mention it. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...
Github user eliaslevy commented on the issue: https://github.com/apache/flink/pull/2369 @rmetzger that's the one. NP. I realize breaking it up makes things easier. I just thought I'd mention it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435172#comment-15435172 ] ASF GitHub Bot commented on FLINK-4437: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 ``` Executing Maven: -B -f /home/jenkins/jenkins-slave/workspace/flink-github-ci/pom.xml -Dmaven.repo.local=/home/jenkins/jenkins-slave/maven-repositories/1 clean install -Dflink.forkCount=1C ``` I don't see flink-github-ci in the source tree. > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435171#comment-15435171 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2400 Thank you @wangzhijiang999 for the changes. I've made a couple of more requests after which we can merge the PR. > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 ``` Executing Maven: -B -f /home/jenkins/jenkins-slave/workspace/flink-github-ci/pom.xml -Dmaven.repo.local=/home/jenkins/jenkins-slave/maven-repositories/1 clean install -Dflink.forkCount=1C ``` I don't see flink-github-ci in the source tree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2400 Thank you @wangzhijiang999 for the changes. I've made a couple of more requests after which we can merge the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435164#comment-15435164 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76085144 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe if (newLeaderAddress != null) { // the resource manager switched to a new leader log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); --- End diff -- Same goes for the start method. > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435163#comment-15435163 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76085101 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe if (newLeaderAddress != null) { // the resource manager switched to a new leader log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); --- End diff -- Why are these essentials RPC methods at the bottom. They should be moved to the top of the class. > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76085144 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe if (newLeaderAddress != null) { // the resource manager switched to a new leader log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); --- End diff -- Same goes for the start method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76085101 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe if (newLeaderAddress != null) { // the resource manager switched to a new leader log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); --- End diff -- Why are these essentials RPC methods at the bottom. They should be moved to the top of the class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084947 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { +
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435161#comment-15435161 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084947 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435159#comment-15435159 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084797 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084797 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { +
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435158#comment-15435158 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084721 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084721 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { +
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435153#comment-15435153 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084398 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76084398 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -36,27 +82,617 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskExecutorConfiguration taskExecutorConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskExecutorConfiguration taskExecutorConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskExecutorConfig = checkNotNull(taskExecutorConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param resourceID The id of the resource which the task manager will run on. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID) throws Exception { + + InetSocketAddress taskManagerAddress = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(taskManagerAddress.getHostName(), resourceID, taskManagerAddress.getPort(), configuration); + } + + private static InetSocketAddress selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname); + } else { +
[jira] [Created] (FLINK-4480) Incorrect link to elastic.co in documentation
Fabian Hueske created FLINK-4480: Summary: Incorrect link to elastic.co in documentation Key: FLINK-4480 URL: https://issues.apache.org/jira/browse/FLINK-4480 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.2.0, 1.1.1 Reporter: Fabian Hueske Priority: Trivial The link URL of the entry "Elasticsearch 2x (sink)" on the connector's documentation page https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html is pointing to http://elastic.com but should point to http://elastic.co -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4451) Throw exception when remote connection cannot be resolved
[ https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435082#comment-15435082 ] ASF GitHub Bot commented on FLINK-4451: --- Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2405 > Throw exception when remote connection cannot be resolved > - > > Key: FLINK-4451 > URL: https://issues.apache.org/jira/browse/FLINK-4451 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcService}} implementation should throw an exception (returned in the > future) if {{RpcService.connect(address, type)}} cannot connect to the remote > {{RpcEndpoint}}. > At the moment the {{AkkaRpcService}} does not check that the > {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to > that a {{NullPointerException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4451) Throw exception when remote connection cannot be resolved
[ https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4451. Resolution: Fixed Fixed via affa548cada5d9c25c32eb1f39986bbbd3e55f21 > Throw exception when remote connection cannot be resolved > - > > Key: FLINK-4451 > URL: https://issues.apache.org/jira/browse/FLINK-4451 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcService}} implementation should throw an exception (returned in the > future) if {{RpcService.connect(address, type)}} cannot connect to the remote > {{RpcEndpoint}}. > At the moment the {{AkkaRpcService}} does not check that the > {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to > that a {{NullPointerException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2405: [FLINK-4451] [rpc] Throw RpcConnectionException wh...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2405 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website
Robert Metzger created FLINK-4479: - Summary: Replace trademark (tm) with registered trademark (R) sign on Flink website Key: FLINK-4479 URL: https://issues.apache.org/jira/browse/FLINK-4479 Project: Flink Issue Type: Bug Components: Project Website Reporter: Robert Metzger Assignee: Robert Metzger Flink is now a registered trademark, so we should reflect that on our website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4451) Throw exception when remote connection cannot be resolved
[ https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435075#comment-15435075 ] ASF GitHub Bot commented on FLINK-4451: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2405 Failing test cases are unrelated. Will be merging this PR. > Throw exception when remote connection cannot be resolved > - > > Key: FLINK-4451 > URL: https://issues.apache.org/jira/browse/FLINK-4451 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcService}} implementation should throw an exception (returned in the > future) if {{RpcService.connect(address, type)}} cannot connect to the remote > {{RpcEndpoint}}. > At the moment the {{AkkaRpcService}} does not check that the > {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to > that a {{NullPointerException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2405: [FLINK-4451] [rpc] Throw RpcConnectionException when rpc ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2405 Failing test cases are unrelated. Will be merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1460) Typo fixes
[ https://issues.apache.org/jira/browse/FLINK-1460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435069#comment-15435069 ] ASF GitHub Bot commented on FLINK-1460: --- Github user coveralls commented on the issue: https://github.com/apache/flink/pull/346 [![Coverage Status](https://coveralls.io/builds/7588381/badge)](https://coveralls.io/builds/7588381) Changes Unknown when pulling **acf52748f7ed9fe64d9773acbab5774b308e47eb on coderxiang:typo** into ** on apache:master**. > Typo fixes > -- > > Key: FLINK-1460 > URL: https://issues.apache.org/jira/browse/FLINK-1460 > Project: Flink > Issue Type: Improvement >Reporter: Shuo Xiang >Priority: Minor > Fix For: 0.9 > > > Fix some typos. Also fix some inconsistent uses of *partition operator* and > *partitioning operator* in the codebase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #346: [FLINK-1460] fix typos
Github user coveralls commented on the issue: https://github.com/apache/flink/pull/346 [![Coverage Status](https://coveralls.io/builds/7588381/badge)](https://coveralls.io/builds/7588381) Changes Unknown when pulling **acf52748f7ed9fe64d9773acbab5774b308e47eb on coderxiang:typo** into ** on apache:master**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---